1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51 pub insert_sst_level: u32,
52 pub insert_sst_infos: Vec<SstableInfo>,
53 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60 self.levels
61 .get(&compaction_group_id)
62 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63 }
64
65 pub fn get_compaction_group_levels_mut(
66 &mut self,
67 compaction_group_id: CompactionGroupId,
68 ) -> &mut Levels {
69 self.levels
70 .get_mut(&compaction_group_id)
71 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72 }
73
74 pub fn get_sst_ids_by_group_id(
76 &self,
77 compaction_group_id: CompactionGroupId,
78 ) -> impl Iterator<Item = HummockSstableId> + '_ {
79 self.levels
80 .iter()
81 .filter_map(move |(cg_id, level)| {
82 if *cg_id == compaction_group_id {
83 Some(level)
84 } else {
85 None
86 }
87 })
88 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89 .flat_map(|level| level.table_infos.iter())
90 .map(|s| s.sst_id)
91 }
92
93 pub fn level_iter<F: FnMut(&Level) -> bool>(
94 &self,
95 compaction_group_id: CompactionGroupId,
96 mut f: F,
97 ) {
98 if let Some(levels) = self.levels.get(&compaction_group_id) {
99 for sub_level in &levels.l0.sub_levels {
100 if !f(sub_level) {
101 return;
102 }
103 }
104 for level in &levels.levels {
105 if !f(level) {
106 return;
107 }
108 }
109 }
110 }
111
112 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113 self.levels
115 .get(&compaction_group_id)
116 .map(|group| group.levels.len() + 1)
117 .unwrap_or(0)
118 }
119
120 pub fn safe_epoch_table_watermarks(
121 &self,
122 existing_table_ids: &[TableId],
123 ) -> BTreeMap<TableId, TableWatermarks> {
124 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125 }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130 existing_table_ids: &[TableId],
131) -> BTreeMap<TableId, TableWatermarks> {
132 fn extract_single_table_watermark(
133 table_watermarks: &TableWatermarks,
134 ) -> Option<TableWatermarks> {
135 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136 Some(TableWatermarks {
137 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138 direction: table_watermarks.direction,
139 watermark_type: table_watermarks.watermark_type,
140 })
141 } else {
142 None
143 }
144 }
145 table_watermarks
146 .iter()
147 .filter_map(|(table_id, table_watermarks)| {
148 if !existing_table_ids.contains(table_id) {
149 None
150 } else {
151 extract_single_table_watermark(table_watermarks)
152 .map(|table_watermarks| (*table_id, table_watermarks))
153 }
154 })
155 .collect()
156}
157
158pub fn safe_epoch_read_table_watermarks_impl(
159 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
160) -> BTreeMap<TableId, ReadTableWatermark> {
161 safe_epoch_watermarks
162 .into_iter()
163 .map(|(table_id, watermarks)| {
164 assert_eq!(watermarks.watermarks.len(), 1);
165 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
166 let mut vnode_watermark_map = BTreeMap::new();
167 for vnode_watermark in vnode_watermarks.iter() {
168 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
169 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
170 assert!(
171 vnode_watermark_map
172 .insert(vnode, watermark.clone())
173 .is_none(),
174 "duplicate table watermark on vnode {}",
175 vnode.to_index()
176 );
177 }
178 }
179 (
180 table_id,
181 ReadTableWatermark {
182 direction: watermarks.direction,
183 vnode_watermarks: vnode_watermark_map,
184 },
185 )
186 })
187 .collect()
188}
189
190impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
191 pub fn count_new_ssts_in_group_split(
192 &self,
193 parent_group_id: CompactionGroupId,
194 split_key: Bytes,
195 ) -> u64 {
196 self.levels
197 .get(&parent_group_id)
198 .map_or(0, |parent_levels| {
199 let l0 = &parent_levels.l0;
200 let mut split_count = 0;
201 for sub_level in &l0.sub_levels {
202 assert!(!sub_level.table_infos.is_empty());
203
204 if sub_level.level_type == PbLevelType::Overlapping {
205 split_count += sub_level
207 .table_infos
208 .iter()
209 .map(|sst| {
210 if let group_split::SstSplitType::Both =
211 group_split::need_to_split(sst, split_key.clone())
212 {
213 2
214 } else {
215 0
216 }
217 })
218 .sum::<u64>();
219 continue;
220 }
221
222 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
223 let sst = sub_level.table_infos.get(pos).unwrap();
224
225 if let group_split::SstSplitType::Both =
226 group_split::need_to_split(sst, split_key.clone())
227 {
228 split_count += 2;
229 }
230 }
231
232 for level in &parent_levels.levels {
233 if level.table_infos.is_empty() {
234 continue;
235 }
236 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
237 let sst = level.table_infos.get(pos).unwrap();
238 if let group_split::SstSplitType::Both =
239 group_split::need_to_split(sst, split_key.clone())
240 {
241 split_count += 2;
242 }
243 }
244
245 split_count
246 })
247 }
248
249 pub fn init_with_parent_group(
250 &mut self,
251 parent_group_id: CompactionGroupId,
252 group_id: CompactionGroupId,
253 member_table_ids: BTreeSet<StateTableId>,
254 new_sst_start_id: HummockSstableId,
255 ) {
256 let mut new_sst_id = new_sst_start_id;
257 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
258 if new_sst_start_id != 0 {
259 if cfg!(debug_assertions) {
260 panic!(
261 "non-zero sst start id {} for NewCompactionGroup",
262 new_sst_start_id
263 );
264 } else {
265 warn!(
266 %new_sst_start_id,
267 "non-zero sst start id for NewCompactionGroup"
268 );
269 }
270 }
271 return;
272 } else if !self.levels.contains_key(&parent_group_id) {
273 unreachable!(
274 "non-existing parent group id {} to init from",
275 parent_group_id
276 );
277 }
278 let [parent_levels, cur_levels] = self
279 .levels
280 .get_disjoint_mut([&parent_group_id, &group_id])
281 .map(|res| res.unwrap());
282 parent_levels.compaction_group_version_id += 1;
285 cur_levels.compaction_group_version_id += 1;
286 let l0 = &mut parent_levels.l0;
287 {
288 for sub_level in &mut l0.sub_levels {
289 let target_l0 = &mut cur_levels.l0;
290 let insert_table_infos =
293 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
294 sub_level
295 .table_infos
296 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
297 .for_each(|sst_info| {
298 sub_level.total_file_size -= sst_info.sst_size;
299 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
300 l0.total_file_size -= sst_info.sst_size;
301 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
302 });
303 if insert_table_infos.is_empty() {
304 continue;
305 }
306 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
307 Ok(idx) => {
308 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
309 }
310 Err(idx) => {
311 insert_new_sub_level(
312 target_l0,
313 sub_level.sub_level_id,
314 sub_level.level_type,
315 insert_table_infos,
316 Some(idx),
317 );
318 }
319 }
320 }
321
322 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
323 }
324 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
325 let insert_table_infos =
326 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
327 cur_levels.levels[idx].total_file_size += insert_table_infos
328 .iter()
329 .map(|sst| sst.sst_size)
330 .sum::<u64>();
331 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
332 .iter()
333 .map(|sst| sst.uncompressed_file_size)
334 .sum::<u64>();
335 cur_levels.levels[idx]
336 .table_infos
337 .extend(insert_table_infos);
338 cur_levels.levels[idx]
339 .table_infos
340 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
341 assert!(can_concat(&cur_levels.levels[idx].table_infos));
342 level
343 .table_infos
344 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
345 .for_each(|sst_info| {
346 level.total_file_size -= sst_info.sst_size;
347 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
348 });
349 }
350
351 assert!(
352 parent_levels
353 .l0
354 .sub_levels
355 .iter()
356 .all(|level| !level.table_infos.is_empty())
357 );
358 assert!(
359 cur_levels
360 .l0
361 .sub_levels
362 .iter()
363 .all(|level| !level.table_infos.is_empty())
364 );
365 }
366
367 pub fn build_sst_delta_infos(
368 &self,
369 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
370 ) -> Vec<SstDeltaInfo> {
371 let mut infos = vec![];
372
373 if version_delta.trivial_move {
376 return infos;
377 }
378
379 for (group_id, group_deltas) in &version_delta.group_deltas {
380 let mut info = SstDeltaInfo::default();
381
382 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
383 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
384
385 if !group_deltas.group_deltas.iter().all(|delta| {
387 matches!(
388 delta,
389 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
390 )
391 }) {
392 continue;
393 }
394
395 for group_delta in &group_deltas.group_deltas {
399 match group_delta {
400 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
401 if !inserted_table_infos.is_empty() {
402 info.insert_sst_level = 0;
403 info.insert_sst_infos
404 .extend(inserted_table_infos.iter().cloned());
405 }
406 }
407 GroupDeltaCommon::IntraLevel(intra_level) => {
408 if !intra_level.inserted_table_infos.is_empty() {
409 info.insert_sst_level = intra_level.level_idx;
410 info.insert_sst_infos
411 .extend(intra_level.inserted_table_infos.iter().cloned());
412 }
413 if !intra_level.removed_table_ids.is_empty() {
414 for id in &intra_level.removed_table_ids {
415 if intra_level.level_idx == 0 {
416 removed_l0_ssts.insert(*id);
417 } else {
418 removed_ssts
419 .entry(intra_level.level_idx)
420 .or_default()
421 .insert(*id);
422 }
423 }
424 }
425 }
426 GroupDeltaCommon::GroupConstruct(_)
427 | GroupDeltaCommon::GroupDestroy(_)
428 | GroupDeltaCommon::GroupMerge(_)
429 | GroupDeltaCommon::TruncateTables(_) => {}
430 }
431 }
432
433 let group = self.levels.get(group_id).unwrap();
434 for l0_sub_level in &group.level0().sub_levels {
435 for sst_info in &l0_sub_level.table_infos {
436 if removed_l0_ssts.remove(&sst_info.sst_id) {
437 info.delete_sst_object_ids.push(sst_info.object_id);
438 }
439 }
440 }
441 for level in &group.levels {
442 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443 for sst_info in &level.table_infos {
444 if removed_level_ssts.remove(&sst_info.sst_id) {
445 info.delete_sst_object_ids.push(sst_info.object_id);
446 }
447 }
448 if !removed_level_ssts.is_empty() {
449 tracing::error!(
450 "removed_level_ssts is not empty: {:?}",
451 removed_level_ssts,
452 );
453 }
454 debug_assert!(removed_level_ssts.is_empty());
455 }
456 }
457
458 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459 tracing::error!(
460 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461 removed_l0_ssts,
462 removed_ssts
463 );
464 }
465 debug_assert!(removed_l0_ssts.is_empty());
466 debug_assert!(removed_ssts.is_empty());
467
468 infos.push(info);
469 }
470
471 infos
472 }
473
474 pub fn apply_version_delta(
475 &mut self,
476 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477 ) {
478 assert_eq!(self.id, version_delta.prev_id);
479
480 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481 &version_delta.state_table_info_delta,
482 &version_delta.removed_table_ids,
483 );
484
485 #[expect(deprecated)]
486 {
487 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488 is_commit_epoch = true;
489 tracing::trace!(
490 "max committed epoch bumped but no table committed epoch is changed"
491 );
492 }
493 }
494
495 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497 let mut is_applied_l0_compact = false;
498 for group_delta in &group_deltas.group_deltas {
499 match group_delta {
500 GroupDeltaCommon::GroupConstruct(group_construct) => {
501 let mut new_levels = build_initial_compaction_group_levels(
502 *compaction_group_id,
503 group_construct.get_group_config().unwrap(),
504 );
505 let parent_group_id = group_construct.parent_group_id;
506 new_levels.parent_group_id = parent_group_id;
507 #[expect(deprecated)]
508 new_levels
510 .member_table_ids
511 .clone_from(&group_construct.table_ids);
512 self.levels.insert(*compaction_group_id, new_levels);
513 let member_table_ids = if group_construct.version()
514 >= CompatibilityVersion::NoMemberTableIds
515 {
516 self.state_table_info
517 .compaction_group_member_table_ids(*compaction_group_id)
518 .iter()
519 .copied()
520 .collect()
521 } else {
522 #[expect(deprecated)]
523 BTreeSet::from_iter(
525 group_construct.table_ids.iter().copied().map(Into::into),
526 )
527 };
528
529 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
530 let split_key = if group_construct.split_key.is_some() {
531 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
532 } else {
533 None
534 };
535 self.init_with_parent_group_v2(
536 parent_group_id,
537 *compaction_group_id,
538 group_construct.get_new_sst_start_id().into(),
539 split_key.clone(),
540 );
541 } else {
542 self.init_with_parent_group(
544 parent_group_id,
545 *compaction_group_id,
546 member_table_ids,
547 group_construct.get_new_sst_start_id().into(),
548 );
549 }
550 }
551 GroupDeltaCommon::GroupMerge(group_merge) => {
552 tracing::info!(
553 "group_merge left {:?} right {:?}",
554 group_merge.left_group_id,
555 group_merge.right_group_id
556 );
557 self.merge_compaction_group(
558 group_merge.left_group_id,
559 group_merge.right_group_id,
560 )
561 }
562 GroupDeltaCommon::IntraLevel(level_delta) => {
563 let levels =
564 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
565 panic!("compaction group {} does not exist", compaction_group_id)
566 });
567 if is_commit_epoch {
568 assert!(
569 level_delta.removed_table_ids.is_empty(),
570 "no sst should be deleted when committing an epoch"
571 );
572
573 let IntraLevelDelta {
574 level_idx,
575 l0_sub_level_id,
576 inserted_table_infos,
577 ..
578 } = level_delta;
579 {
580 assert_eq!(
581 *level_idx, 0,
582 "we should only add to L0 when we commit an epoch."
583 );
584 if !inserted_table_infos.is_empty() {
585 insert_new_sub_level(
586 &mut levels.l0,
587 *l0_sub_level_id,
588 PbLevelType::Overlapping,
589 inserted_table_infos.clone(),
590 None,
591 );
592 }
593 }
594 } else {
595 levels.apply_compact_ssts(
597 level_delta,
598 self.state_table_info
599 .compaction_group_member_table_ids(*compaction_group_id),
600 );
601 if level_delta.level_idx == 0 {
602 is_applied_l0_compact = true;
603 }
604 }
605 }
606 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
607 let levels =
608 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
609 panic!("compaction group {} does not exist", compaction_group_id)
610 });
611 assert!(is_commit_epoch);
612
613 if !inserted_table_infos.is_empty() {
614 let next_l0_sub_level_id = levels
615 .l0
616 .sub_levels
617 .last()
618 .map(|level| level.sub_level_id + 1)
619 .unwrap_or(1);
620
621 insert_new_sub_level(
622 &mut levels.l0,
623 next_l0_sub_level_id,
624 PbLevelType::Overlapping,
625 inserted_table_infos.clone(),
626 None,
627 );
628 }
629 }
630 GroupDeltaCommon::GroupDestroy(_) => {
631 self.levels.remove(compaction_group_id);
632 }
633
634 GroupDeltaCommon::TruncateTables(table_ids) => {
635 let levels =
637 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
638 panic!("compaction group {} does not exist", compaction_group_id)
639 });
640
641 for sub_level in &mut levels.l0.sub_levels {
642 sub_level.table_infos.iter_mut().for_each(|sstable_info| {
643 let mut inner = sstable_info.get_inner();
644 inner.table_ids.retain(|id| !table_ids.contains(id));
645 sstable_info.set_inner(inner);
646 });
647 }
648
649 for level in &mut levels.levels {
650 level.table_infos.iter_mut().for_each(|sstable_info| {
651 let mut inner = sstable_info.get_inner();
652 inner.table_ids.retain(|id| !table_ids.contains(id));
653 sstable_info.set_inner(inner);
654 });
655 }
656
657 levels.compaction_group_version_id += 1;
658 }
659 }
660 }
661
662 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
663 {
664 levels.post_apply_l0_compact();
665 }
666 }
667 self.id = version_delta.id;
668 #[expect(deprecated)]
669 {
670 self.max_committed_epoch = version_delta.max_committed_epoch;
671 }
672
673 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
677 HashMap::new();
678
679 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
681 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
682 if version_delta.removed_table_ids.contains(table_id) {
683 modified_table_watermarks.insert(*table_id, None);
684 } else {
685 let mut current_table_watermarks = (**current_table_watermarks).clone();
686 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
687 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
688 }
689 } else {
690 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
691 }
692 }
693 for (table_id, table_watermarks) in &self.table_watermarks {
694 let safe_epoch = if let Some(state_table_info) =
695 self.state_table_info.info().get(table_id)
696 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
697 && state_table_info.committed_epoch > *oldest_epoch
698 {
699 state_table_info.committed_epoch
701 } else {
702 continue;
704 };
705 let table_watermarks = modified_table_watermarks
706 .entry(*table_id)
707 .or_insert_with(|| Some((**table_watermarks).clone()));
708 if let Some(table_watermarks) = table_watermarks {
709 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
710 }
711 }
712 for (table_id, table_watermarks) in modified_table_watermarks {
714 if let Some(table_watermarks) = table_watermarks {
715 self.table_watermarks
716 .insert(table_id, Arc::new(table_watermarks));
717 } else {
718 self.table_watermarks.remove(&table_id);
719 }
720 }
721
722 Self::apply_change_log_delta(
724 &mut self.table_change_log,
725 &version_delta.change_log_delta,
726 &version_delta.removed_table_ids,
727 &version_delta.state_table_info_delta,
728 &changed_table_info,
729 );
730
731 apply_vector_index_delta(
733 &mut self.vector_indexes,
734 &version_delta.vector_index_delta,
735 &version_delta.removed_table_ids,
736 );
737 }
738
739 pub fn apply_change_log_delta<T: Clone>(
740 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
741 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
742 removed_table_ids: &HashSet<TableId>,
743 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
744 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
745 ) {
746 for (table_id, change_log_delta) in change_log_delta {
747 let new_change_log = &change_log_delta.new_log;
748 match table_change_log.entry(*table_id) {
749 Entry::Occupied(entry) => {
750 let change_log = entry.into_mut();
751 change_log.add_change_log(new_change_log.clone());
752 }
753 Entry::Vacant(entry) => {
754 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
755 }
756 };
757 }
758
759 table_change_log.retain(|table_id, _| {
763 if removed_table_ids.contains(table_id) {
764 return false;
765 }
766 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
767 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
768 } else {
770 return true;
772 }
773 let contains = change_log_delta.contains_key(table_id);
774 if !contains {
775 warn!(
776 ?table_id,
777 "table change log dropped due to no further change log at newly committed epoch",
778 );
779 }
780 contains
781 });
782
783 for (table_id, change_log_delta) in change_log_delta {
785 if let Some(change_log) = table_change_log.get_mut(table_id) {
786 change_log.truncate(change_log_delta.truncate_epoch);
787 }
788 }
789 }
790
791 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
792 let mut ret: BTreeMap<_, _> = BTreeMap::new();
793 for (compaction_group_id, group) in &self.levels {
794 let mut levels = vec![];
795 levels.extend(group.l0.sub_levels.iter());
796 levels.extend(group.levels.iter());
797 for level in levels {
798 for table_info in &level.table_infos {
799 if table_info.sst_id.inner() == table_info.object_id.inner() {
800 continue;
801 }
802 let object_id = table_info.object_id;
803 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
804 entry
805 .entry(*compaction_group_id)
806 .or_default()
807 .push(table_info.sst_id)
808 }
809 }
810 }
811 ret
812 }
813
814 pub fn merge_compaction_group(
815 &mut self,
816 left_group_id: CompactionGroupId,
817 right_group_id: CompactionGroupId,
818 ) {
819 let left_group_id_table_ids = self
821 .state_table_info
822 .compaction_group_member_table_ids(left_group_id)
823 .iter();
824 let right_group_id_table_ids = self
825 .state_table_info
826 .compaction_group_member_table_ids(right_group_id)
827 .iter();
828
829 assert!(
830 left_group_id_table_ids
831 .chain(right_group_id_table_ids)
832 .is_sorted()
833 );
834
835 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
836 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
837 panic!(
838 "compaction group should exist right {} all {:?}",
839 right_group_id, total_cg
840 )
841 });
842
843 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
844 panic!(
845 "compaction group should exist left {} all {:?}",
846 left_group_id, total_cg
847 )
848 });
849
850 group_split::merge_levels(left_levels, right_levels);
851 }
852
853 pub fn init_with_parent_group_v2(
854 &mut self,
855 parent_group_id: CompactionGroupId,
856 group_id: CompactionGroupId,
857 new_sst_start_id: HummockSstableId,
858 split_key: Option<Bytes>,
859 ) {
860 let mut new_sst_id = new_sst_start_id;
861 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
862 if new_sst_start_id != 0 {
863 if cfg!(debug_assertions) {
864 panic!(
865 "non-zero sst start id {} for NewCompactionGroup",
866 new_sst_start_id
867 );
868 } else {
869 warn!(
870 %new_sst_start_id,
871 "non-zero sst start id for NewCompactionGroup"
872 );
873 }
874 }
875 return;
876 } else if !self.levels.contains_key(&parent_group_id) {
877 unreachable!(
878 "non-existing parent group id {} to init from (V2)",
879 parent_group_id
880 );
881 }
882
883 let [parent_levels, cur_levels] = self
884 .levels
885 .get_disjoint_mut([&parent_group_id, &group_id])
886 .map(|res| res.unwrap());
887 parent_levels.compaction_group_version_id += 1;
890 cur_levels.compaction_group_version_id += 1;
891
892 let l0 = &mut parent_levels.l0;
893 {
894 for sub_level in &mut l0.sub_levels {
895 let target_l0 = &mut cur_levels.l0;
896 let insert_table_infos = if let Some(split_key) = &split_key {
899 group_split::split_sst_info_for_level_v2(
900 sub_level,
901 &mut new_sst_id,
902 split_key.clone(),
903 )
904 } else {
905 vec![]
906 };
907
908 if insert_table_infos.is_empty() {
909 continue;
910 }
911
912 sub_level
913 .table_infos
914 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
915 .for_each(|sst_info| {
916 sub_level.total_file_size -= sst_info.sst_size;
917 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
918 l0.total_file_size -= sst_info.sst_size;
919 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
920 });
921 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
922 Ok(idx) => {
923 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
924 }
925 Err(idx) => {
926 insert_new_sub_level(
927 target_l0,
928 sub_level.sub_level_id,
929 sub_level.level_type,
930 insert_table_infos,
931 Some(idx),
932 );
933 }
934 }
935 }
936
937 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
938 }
939
940 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
941 let insert_table_infos = if let Some(split_key) = &split_key {
942 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
943 } else {
944 vec![]
945 };
946
947 if insert_table_infos.is_empty() {
948 continue;
949 }
950
951 cur_levels.levels[idx].total_file_size += insert_table_infos
952 .iter()
953 .map(|sst| sst.sst_size)
954 .sum::<u64>();
955 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
956 .iter()
957 .map(|sst| sst.uncompressed_file_size)
958 .sum::<u64>();
959 cur_levels.levels[idx]
960 .table_infos
961 .extend(insert_table_infos);
962 cur_levels.levels[idx]
963 .table_infos
964 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
965 assert!(can_concat(&cur_levels.levels[idx].table_infos));
966 level
967 .table_infos
968 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
969 .for_each(|sst_info| {
970 level.total_file_size -= sst_info.sst_size;
971 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
972 });
973 }
974
975 assert!(
976 parent_levels
977 .l0
978 .sub_levels
979 .iter()
980 .all(|level| !level.table_infos.is_empty())
981 );
982 assert!(
983 cur_levels
984 .l0
985 .sub_levels
986 .iter()
987 .all(|level| !level.table_infos.is_empty())
988 );
989 }
990}
991
992impl<T> HummockVersionCommon<T>
993where
994 T: SstableIdReader + ObjectIdReader,
995{
996 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
997 self.get_sst_infos(exclude_change_log)
998 .map(|s| s.object_id())
999 .collect()
1000 }
1001
1002 pub fn get_object_ids(
1003 &self,
1004 exclude_change_log: bool,
1005 ) -> impl Iterator<Item = HummockObjectId> + '_ {
1006 match HummockObjectId::Sstable(0.into()) {
1010 HummockObjectId::Sstable(_) => {}
1011 HummockObjectId::VectorFile(_) => {}
1012 HummockObjectId::HnswGraphFile(_) => {}
1013 };
1014 self.get_sst_infos(exclude_change_log)
1015 .map(|s| HummockObjectId::Sstable(s.object_id()))
1016 .chain(
1017 self.vector_indexes
1018 .values()
1019 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1020 )
1021 }
1022
1023 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1024 self.get_sst_infos(exclude_change_log)
1025 .map(|s| s.sst_id())
1026 .collect()
1027 }
1028
1029 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1030 let may_table_change_log = if exclude_change_log {
1031 None
1032 } else {
1033 Some(self.table_change_log.values())
1034 };
1035 self.get_combined_levels()
1036 .flat_map(|level| level.table_infos.iter())
1037 .chain(
1038 may_table_change_log
1039 .map(|v| {
1040 v.flat_map(|table_change_log| {
1041 table_change_log.iter().flat_map(|epoch_change_log| {
1042 epoch_change_log
1043 .old_value
1044 .iter()
1045 .chain(epoch_change_log.new_value.iter())
1046 })
1047 })
1048 })
1049 .into_iter()
1050 .flatten(),
1051 )
1052 }
1053}
1054
1055impl Levels {
1056 pub(crate) fn apply_compact_ssts(
1057 &mut self,
1058 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1059 member_table_ids: &BTreeSet<TableId>,
1060 ) {
1061 let IntraLevelDeltaCommon {
1062 level_idx,
1063 l0_sub_level_id,
1064 inserted_table_infos: insert_table_infos,
1065 vnode_partition_count,
1066 removed_table_ids: delete_sst_ids_set,
1067 compaction_group_version_id,
1068 } = level_delta;
1069 let new_vnode_partition_count = *vnode_partition_count;
1070
1071 if is_compaction_task_expired(
1072 self.compaction_group_version_id,
1073 *compaction_group_version_id,
1074 ) {
1075 warn!(
1076 current_compaction_group_version_id = self.compaction_group_version_id,
1077 delta_compaction_group_version_id = compaction_group_version_id,
1078 level_idx,
1079 l0_sub_level_id,
1080 insert_table_infos = ?insert_table_infos
1081 .iter()
1082 .map(|sst| (sst.sst_id, sst.object_id))
1083 .collect_vec(),
1084 ?delete_sst_ids_set,
1085 "This VersionDelta may be committed by an expired compact task. Please check it."
1086 );
1087 return;
1088 }
1089 if !delete_sst_ids_set.is_empty() {
1090 if *level_idx == 0 {
1091 for level in &mut self.l0.sub_levels {
1092 level_delete_ssts(level, delete_sst_ids_set);
1093 }
1094 } else {
1095 let idx = *level_idx as usize - 1;
1096 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1097 }
1098 }
1099
1100 if !insert_table_infos.is_empty() {
1101 let insert_sst_level_id = *level_idx;
1102 let insert_sub_level_id = *l0_sub_level_id;
1103 if insert_sst_level_id == 0 {
1104 let l0 = &mut self.l0;
1105 let index = l0
1106 .sub_levels
1107 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1108 assert!(
1109 index < l0.sub_levels.len()
1110 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1111 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1112 insert_sub_level_id,
1113 delete_sst_ids_set,
1114 l0.sub_levels
1115 .iter()
1116 .map(|level| level.sub_level_id)
1117 .collect_vec()
1118 );
1119 if l0.sub_levels[index].table_infos.is_empty()
1120 && member_table_ids.len() == 1
1121 && insert_table_infos.iter().all(|sst| {
1122 sst.table_ids.len() == 1
1123 && sst.table_ids[0]
1124 == *member_table_ids.iter().next().expect("non-empty")
1125 })
1126 {
1127 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1130 }
1131 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1132 } else {
1133 let idx = insert_sst_level_id as usize - 1;
1134 if self.levels[idx].table_infos.is_empty()
1135 && insert_table_infos
1136 .iter()
1137 .all(|sst| sst.table_ids.len() == 1)
1138 {
1139 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1140 } else if self.levels[idx].vnode_partition_count != 0
1141 && new_vnode_partition_count == 0
1142 && member_table_ids.len() > 1
1143 {
1144 self.levels[idx].vnode_partition_count = 0;
1145 }
1146 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1147 }
1148 }
1149 }
1150
1151 pub(crate) fn post_apply_l0_compact(&mut self) {
1152 {
1153 self.l0
1154 .sub_levels
1155 .retain(|level| !level.table_infos.is_empty());
1156 self.l0.total_file_size = self
1157 .l0
1158 .sub_levels
1159 .iter()
1160 .map(|level| level.total_file_size)
1161 .sum::<u64>();
1162 self.l0.uncompressed_file_size = self
1163 .l0
1164 .sub_levels
1165 .iter()
1166 .map(|level| level.uncompressed_file_size)
1167 .sum::<u64>();
1168 }
1169 }
1170}
1171
1172impl<T, L> HummockVersionCommon<T, L> {
1173 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1174 self.levels
1175 .values()
1176 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1177 }
1178}
1179
1180pub fn build_initial_compaction_group_levels(
1181 group_id: CompactionGroupId,
1182 compaction_config: &CompactionConfig,
1183) -> Levels {
1184 let mut levels = vec![];
1185 for l in 0..compaction_config.get_max_level() {
1186 levels.push(Level {
1187 level_idx: (l + 1) as u32,
1188 level_type: PbLevelType::Nonoverlapping,
1189 table_infos: vec![],
1190 total_file_size: 0,
1191 sub_level_id: 0,
1192 uncompressed_file_size: 0,
1193 vnode_partition_count: 0,
1194 });
1195 }
1196 #[expect(deprecated)] Levels {
1198 levels,
1199 l0: OverlappingLevel {
1200 sub_levels: vec![],
1201 total_file_size: 0,
1202 uncompressed_file_size: 0,
1203 },
1204 group_id,
1205 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1206 member_table_ids: vec![],
1207 compaction_group_version_id: 0,
1208 }
1209}
1210
1211fn split_sst_info_for_level(
1212 member_table_ids: &BTreeSet<TableId>,
1213 level: &mut Level,
1214 new_sst_id: &mut HummockSstableId,
1215) -> Vec<SstableInfo> {
1216 let mut insert_table_infos = vec![];
1219 for sst_info in &mut level.table_infos {
1220 let removed_table_ids = sst_info
1221 .table_ids
1222 .iter()
1223 .filter(|table_id| member_table_ids.contains(table_id))
1224 .cloned()
1225 .collect_vec();
1226 let sst_size = sst_info.sst_size;
1227 if sst_size / 2 == 0 {
1228 tracing::warn!(
1229 id = %sst_info.sst_id,
1230 object_id = %sst_info.object_id,
1231 sst_size = sst_info.sst_size,
1232 file_size = sst_info.file_size,
1233 "Sstable sst_size is under expected",
1234 );
1235 };
1236 if !removed_table_ids.is_empty() {
1237 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1238 sst_info,
1239 new_sst_id,
1240 sst_size / 2,
1241 sst_size / 2,
1242 member_table_ids.iter().cloned().collect_vec(),
1243 );
1244 *sst_info = modified_sst;
1245 insert_table_infos.push(branch_sst);
1246 }
1247 }
1248 insert_table_infos
1249}
1250
1251pub fn get_compaction_group_ids(
1253 version: &HummockVersion,
1254) -> impl Iterator<Item = CompactionGroupId> + '_ {
1255 version.levels.keys().cloned()
1256}
1257
1258pub fn get_table_compaction_group_id_mapping(
1259 version: &HummockVersion,
1260) -> HashMap<StateTableId, CompactionGroupId> {
1261 version
1262 .state_table_info
1263 .info()
1264 .iter()
1265 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1266 .collect()
1267}
1268
1269pub fn get_compaction_group_ssts(
1271 version: &HummockVersion,
1272 group_id: CompactionGroupId,
1273) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1274 let group_levels = version.get_compaction_group_levels(group_id);
1275 group_levels
1276 .l0
1277 .sub_levels
1278 .iter()
1279 .rev()
1280 .chain(group_levels.levels.iter())
1281 .flat_map(|level| {
1282 level
1283 .table_infos
1284 .iter()
1285 .map(|table_info| (table_info.object_id, table_info.sst_id))
1286 })
1287}
1288
1289pub fn new_sub_level(
1290 sub_level_id: u64,
1291 level_type: PbLevelType,
1292 table_infos: Vec<SstableInfo>,
1293) -> Level {
1294 if level_type == PbLevelType::Nonoverlapping {
1295 debug_assert!(
1296 can_concat(&table_infos),
1297 "sst of non-overlapping level is not concat-able: {:?}",
1298 table_infos
1299 );
1300 }
1301 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1302 let uncompressed_file_size = table_infos
1303 .iter()
1304 .map(|table| table.uncompressed_file_size)
1305 .sum();
1306 Level {
1307 level_idx: 0,
1308 level_type,
1309 table_infos,
1310 total_file_size,
1311 sub_level_id,
1312 uncompressed_file_size,
1313 vnode_partition_count: 0,
1314 }
1315}
1316
1317pub fn add_ssts_to_sub_level(
1318 l0: &mut OverlappingLevel,
1319 sub_level_idx: usize,
1320 insert_table_infos: Vec<SstableInfo>,
1321) {
1322 insert_table_infos.iter().for_each(|sst| {
1323 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1324 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1325 l0.total_file_size += sst.sst_size;
1326 l0.uncompressed_file_size += sst.uncompressed_file_size;
1327 });
1328 l0.sub_levels[sub_level_idx]
1329 .table_infos
1330 .extend(insert_table_infos);
1331 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1332 l0.sub_levels[sub_level_idx]
1333 .table_infos
1334 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1335 assert!(
1336 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1337 "sstable ids: {:?}",
1338 l0.sub_levels[sub_level_idx]
1339 .table_infos
1340 .iter()
1341 .map(|sst| sst.sst_id)
1342 .collect_vec()
1343 );
1344 }
1345}
1346
1347pub fn insert_new_sub_level(
1349 l0: &mut OverlappingLevel,
1350 insert_sub_level_id: u64,
1351 level_type: PbLevelType,
1352 insert_table_infos: Vec<SstableInfo>,
1353 sub_level_insert_hint: Option<usize>,
1354) {
1355 if insert_sub_level_id == u64::MAX {
1356 return;
1357 }
1358 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1359 insert_pos
1360 } else {
1361 if let Some(newest_level) = l0.sub_levels.last() {
1362 assert!(
1363 newest_level.sub_level_id < insert_sub_level_id,
1364 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1365 newest_level.sub_level_id,
1366 insert_sub_level_id,
1367 l0,
1368 );
1369 }
1370 l0.sub_levels.len()
1371 };
1372 #[cfg(debug_assertions)]
1373 {
1374 if insert_pos > 0
1375 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1376 {
1377 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1378 }
1379 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1380 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1381 }
1382 }
1383 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1386 l0.total_file_size += level.total_file_size;
1387 l0.uncompressed_file_size += level.uncompressed_file_size;
1388 l0.sub_levels.insert(insert_pos, level);
1389}
1390
1391fn level_delete_ssts(
1395 operand: &mut Level,
1396 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1397) -> bool {
1398 let original_len = operand.table_infos.len();
1399 operand
1400 .table_infos
1401 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1402 operand.total_file_size = operand
1403 .table_infos
1404 .iter()
1405 .map(|table| table.sst_size)
1406 .sum::<u64>();
1407 operand.uncompressed_file_size = operand
1408 .table_infos
1409 .iter()
1410 .map(|table| table.uncompressed_file_size)
1411 .sum::<u64>();
1412 original_len != operand.table_infos.len()
1413}
1414
1415fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1416 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1417 format!(
1418 "sstable ids: {:?}",
1419 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1420 )
1421 }
1422 operand.total_file_size += insert_table_infos
1423 .iter()
1424 .map(|sst| sst.sst_size)
1425 .sum::<u64>();
1426 operand.uncompressed_file_size += insert_table_infos
1427 .iter()
1428 .map(|sst| sst.uncompressed_file_size)
1429 .sum::<u64>();
1430 if operand.level_type == PbLevelType::Overlapping {
1431 operand.level_type = PbLevelType::Nonoverlapping;
1432 operand
1433 .table_infos
1434 .extend(insert_table_infos.iter().cloned());
1435 operand
1436 .table_infos
1437 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1438 assert!(
1439 can_concat(&operand.table_infos),
1440 "{}",
1441 display_sstable_infos(&operand.table_infos)
1442 );
1443 } else if !insert_table_infos.is_empty() {
1444 let sorted_insert: Vec<_> = insert_table_infos
1445 .iter()
1446 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1447 .cloned()
1448 .collect();
1449 let first = &sorted_insert[0];
1450 let last = &sorted_insert[sorted_insert.len() - 1];
1451 let pos = operand
1452 .table_infos
1453 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1454 if pos >= operand.table_infos.len()
1455 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1456 {
1457 operand.table_infos.splice(pos..pos, sorted_insert);
1458 let validate_range = operand
1460 .table_infos
1461 .iter()
1462 .skip(pos.saturating_sub(1))
1463 .take(insert_table_infos.len() + 2)
1464 .collect_vec();
1465 assert!(
1466 can_concat(&validate_range),
1467 "{}",
1468 display_sstable_infos(&validate_range),
1469 );
1470 } else {
1471 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1474 for i in insert_table_infos {
1475 let pos = operand
1476 .table_infos
1477 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1478 operand.table_infos.insert(pos, i.clone());
1479 }
1480 assert!(
1481 can_concat(&operand.table_infos),
1482 "{}",
1483 display_sstable_infos(&operand.table_infos)
1484 );
1485 }
1486 }
1487}
1488
1489pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1490 match HummockObjectId::Sstable(0.into()) {
1494 HummockObjectId::Sstable(_) => {}
1495 HummockObjectId::VectorFile(_) => {}
1496 HummockObjectId::HnswGraphFile(_) => {}
1497 };
1498 version
1499 .levels
1500 .values()
1501 .flat_map(|cg| {
1502 cg.level0()
1503 .sub_levels
1504 .iter()
1505 .chain(cg.levels.iter())
1506 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1507 })
1508 .chain(version.table_change_log.values().flat_map(|c| {
1509 c.iter().flat_map(|l| {
1510 l.old_value
1511 .iter()
1512 .chain(l.new_value.iter())
1513 .map(|t| (t.object_id, t.file_size))
1514 })
1515 }))
1516 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1517 .chain(
1518 version
1519 .vector_indexes
1520 .values()
1521 .flat_map(|index| index.get_objects()),
1522 )
1523 .collect()
1524}
1525
1526pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1529 let mut res = Vec::new();
1530 for (group_id, levels) in &version.levels {
1532 if levels.group_id != *group_id {
1534 res.push(format!(
1535 "GROUP {}: inconsistent group id {} in Levels",
1536 group_id, levels.group_id
1537 ));
1538 }
1539
1540 let validate_level = |group: CompactionGroupId,
1541 expected_level_idx: u32,
1542 level: &Level,
1543 res: &mut Vec<String>| {
1544 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1545 if level.level_idx == 0 {
1546 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1547 if level.table_infos.is_empty() {
1549 res.push(format!("{}: empty level", level_identifier));
1550 }
1551 } else if level.level_type != PbLevelType::Nonoverlapping {
1552 res.push(format!(
1554 "{}: level type {:?} is not non-overlapping",
1555 level_identifier, level.level_type
1556 ));
1557 }
1558
1559 if level.level_idx != expected_level_idx {
1561 res.push(format!(
1562 "{}: mismatched level idx {}",
1563 level_identifier, expected_level_idx
1564 ));
1565 }
1566
1567 let mut prev_table_info: Option<&SstableInfo> = None;
1568 for table_info in &level.table_infos {
1569 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1571 res.push(format!(
1572 "{} SST {}: table_ids not sorted",
1573 level_identifier, table_info.object_id
1574 ));
1575 }
1576
1577 if level.level_type == PbLevelType::Nonoverlapping {
1579 if let Some(prev) = prev_table_info.take()
1580 && prev
1581 .key_range
1582 .compare_right_with(&table_info.key_range.left)
1583 != Ordering::Less
1584 {
1585 res.push(format!(
1586 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1587 level_identifier, table_info.object_id, prev, table_info
1588 ));
1589 }
1590 let _ = prev_table_info.insert(table_info);
1591 }
1592 }
1593 };
1594
1595 let l0 = &levels.l0;
1596 let mut prev_sub_level_id = u64::MAX;
1597 for sub_level in &l0.sub_levels {
1598 if sub_level.sub_level_id >= prev_sub_level_id {
1600 res.push(format!(
1601 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1602 group_id, sub_level.level_idx, prev_sub_level_id
1603 ));
1604 }
1605 prev_sub_level_id = sub_level.sub_level_id;
1606
1607 validate_level(*group_id, 0, sub_level, &mut res);
1608 }
1609
1610 for idx in 1..=levels.levels.len() {
1611 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1612 }
1613 }
1614 res
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619 use std::collections::{HashMap, HashSet};
1620
1621 use bytes::Bytes;
1622 use risingwave_common::catalog::TableId;
1623 use risingwave_common::hash::VirtualNode;
1624 use risingwave_common::util::epoch::test_epoch;
1625 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1626
1627 use super::group_split;
1628 use crate::HummockVersionId;
1629 use crate::compaction_group::group_split::*;
1630 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1631 use crate::key::{FullKey, gen_key_from_str};
1632 use crate::key_range::KeyRange;
1633 use crate::level::{Level, Levels, OverlappingLevel};
1634 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1635 use crate::version::{
1636 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1637 };
1638
1639 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1640 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1641 }
1642
1643 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1644 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1645 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1646 let full_key_l = FullKey::for_test(
1647 TableId::new(*table_ids.first().unwrap()),
1648 table_key_l,
1649 epoch,
1650 )
1651 .encode();
1652 let full_key_r =
1653 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1654 .encode();
1655
1656 SstableInfoInner {
1657 sst_id: sst_id.into(),
1658 key_range: KeyRange {
1659 left: full_key_l.into(),
1660 right: full_key_r.into(),
1661 right_exclusive: false,
1662 },
1663 table_ids: table_ids.into_iter().map(Into::into).collect(),
1664 object_id: sst_id.into(),
1665 min_epoch: 20,
1666 max_epoch: 20,
1667 file_size: 100,
1668 sst_size: 100,
1669 ..Default::default()
1670 }
1671 }
1672
1673 #[test]
1674 fn test_get_sst_object_ids() {
1675 let mut version = HummockVersion {
1676 id: HummockVersionId::new(0),
1677 levels: HashMap::from_iter([(
1678 0,
1679 Levels {
1680 levels: vec![],
1681 l0: OverlappingLevel {
1682 sub_levels: vec![],
1683 total_file_size: 0,
1684 uncompressed_file_size: 0,
1685 },
1686 ..Default::default()
1687 },
1688 )]),
1689 ..Default::default()
1690 };
1691 assert_eq!(version.get_object_ids(false).count(), 0);
1692
1693 version
1695 .levels
1696 .get_mut(&0)
1697 .unwrap()
1698 .l0
1699 .sub_levels
1700 .push(Level {
1701 table_infos: vec![
1702 SstableInfoInner {
1703 object_id: 11.into(),
1704 sst_id: 11.into(),
1705 ..Default::default()
1706 }
1707 .into(),
1708 ],
1709 ..Default::default()
1710 });
1711 assert_eq!(version.get_object_ids(false).count(), 1);
1712
1713 version.levels.get_mut(&0).unwrap().levels.push(Level {
1715 table_infos: vec![
1716 SstableInfoInner {
1717 object_id: 22.into(),
1718 sst_id: 22.into(),
1719 ..Default::default()
1720 }
1721 .into(),
1722 ],
1723 ..Default::default()
1724 });
1725 assert_eq!(version.get_object_ids(false).count(), 2);
1726 }
1727
1728 #[test]
1729 fn test_apply_version_delta() {
1730 let mut version = HummockVersion {
1731 id: HummockVersionId::new(0),
1732 levels: HashMap::from_iter([
1733 (
1734 0,
1735 build_initial_compaction_group_levels(
1736 0,
1737 &CompactionConfig {
1738 max_level: 6,
1739 ..Default::default()
1740 },
1741 ),
1742 ),
1743 (
1744 1,
1745 build_initial_compaction_group_levels(
1746 1,
1747 &CompactionConfig {
1748 max_level: 6,
1749 ..Default::default()
1750 },
1751 ),
1752 ),
1753 ]),
1754 ..Default::default()
1755 };
1756 let version_delta = HummockVersionDelta {
1757 id: HummockVersionId::new(1),
1758 group_deltas: HashMap::from_iter([
1759 (
1760 2,
1761 GroupDeltas {
1762 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1763 group_config: Some(CompactionConfig {
1764 max_level: 6,
1765 ..Default::default()
1766 }),
1767 ..Default::default()
1768 }))],
1769 },
1770 ),
1771 (
1772 0,
1773 GroupDeltas {
1774 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1775 },
1776 ),
1777 (
1778 1,
1779 GroupDeltas {
1780 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1781 1,
1782 0,
1783 HashSet::new(),
1784 vec![
1785 SstableInfoInner {
1786 object_id: 1.into(),
1787 sst_id: 1.into(),
1788 ..Default::default()
1789 }
1790 .into(),
1791 ],
1792 0,
1793 version
1794 .levels
1795 .get(&1)
1796 .as_ref()
1797 .unwrap()
1798 .compaction_group_version_id,
1799 ))],
1800 },
1801 ),
1802 ]),
1803 ..Default::default()
1804 };
1805 let version_delta = version_delta;
1806
1807 version.apply_version_delta(&version_delta);
1808 let mut cg1 = build_initial_compaction_group_levels(
1809 1,
1810 &CompactionConfig {
1811 max_level: 6,
1812 ..Default::default()
1813 },
1814 );
1815 cg1.levels[0] = Level {
1816 level_idx: 1,
1817 level_type: LevelType::Nonoverlapping,
1818 table_infos: vec![
1819 SstableInfoInner {
1820 object_id: 1.into(),
1821 sst_id: 1.into(),
1822 ..Default::default()
1823 }
1824 .into(),
1825 ],
1826 ..Default::default()
1827 };
1828 assert_eq!(
1829 version,
1830 HummockVersion {
1831 id: HummockVersionId::new(1),
1832 levels: HashMap::from_iter([
1833 (
1834 2,
1835 build_initial_compaction_group_levels(
1836 2,
1837 &CompactionConfig {
1838 max_level: 6,
1839 ..Default::default()
1840 },
1841 ),
1842 ),
1843 (1, cg1),
1844 ]),
1845 ..Default::default()
1846 }
1847 );
1848 }
1849
1850 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1851 gen_sst_info_impl(object_id, table_ids, left, right).into()
1852 }
1853
1854 fn gen_sst_info_impl(
1855 object_id: u64,
1856 table_ids: Vec<u32>,
1857 left: Bytes,
1858 right: Bytes,
1859 ) -> SstableInfoInner {
1860 SstableInfoInner {
1861 object_id: object_id.into(),
1862 sst_id: object_id.into(),
1863 key_range: KeyRange {
1864 left,
1865 right,
1866 right_exclusive: false,
1867 },
1868 table_ids: table_ids.into_iter().map(Into::into).collect(),
1869 file_size: 100,
1870 sst_size: 100,
1871 uncompressed_file_size: 100,
1872 ..Default::default()
1873 }
1874 }
1875
1876 #[test]
1877 fn test_merge_levels() {
1878 let mut left_levels = build_initial_compaction_group_levels(
1879 1,
1880 &CompactionConfig {
1881 max_level: 6,
1882 ..Default::default()
1883 },
1884 );
1885
1886 let mut right_levels = build_initial_compaction_group_levels(
1887 2,
1888 &CompactionConfig {
1889 max_level: 6,
1890 ..Default::default()
1891 },
1892 );
1893
1894 left_levels.levels[0] = Level {
1895 level_idx: 1,
1896 level_type: LevelType::Nonoverlapping,
1897 table_infos: vec![
1898 gen_sst_info(
1899 1,
1900 vec![3],
1901 FullKey::for_test(
1902 TableId::new(3),
1903 gen_key_from_str(VirtualNode::from_index(1), "1"),
1904 0,
1905 )
1906 .encode()
1907 .into(),
1908 FullKey::for_test(
1909 TableId::new(3),
1910 gen_key_from_str(VirtualNode::from_index(200), "1"),
1911 0,
1912 )
1913 .encode()
1914 .into(),
1915 ),
1916 gen_sst_info(
1917 10,
1918 vec![3, 4],
1919 FullKey::for_test(
1920 TableId::new(3),
1921 gen_key_from_str(VirtualNode::from_index(201), "1"),
1922 0,
1923 )
1924 .encode()
1925 .into(),
1926 FullKey::for_test(
1927 TableId::new(4),
1928 gen_key_from_str(VirtualNode::from_index(10), "1"),
1929 0,
1930 )
1931 .encode()
1932 .into(),
1933 ),
1934 gen_sst_info(
1935 11,
1936 vec![4],
1937 FullKey::for_test(
1938 TableId::new(4),
1939 gen_key_from_str(VirtualNode::from_index(11), "1"),
1940 0,
1941 )
1942 .encode()
1943 .into(),
1944 FullKey::for_test(
1945 TableId::new(4),
1946 gen_key_from_str(VirtualNode::from_index(200), "1"),
1947 0,
1948 )
1949 .encode()
1950 .into(),
1951 ),
1952 ],
1953 total_file_size: 300,
1954 ..Default::default()
1955 };
1956
1957 left_levels.l0.sub_levels.push(Level {
1958 level_idx: 0,
1959 table_infos: vec![gen_sst_info(
1960 3,
1961 vec![3],
1962 FullKey::for_test(
1963 TableId::new(3),
1964 gen_key_from_str(VirtualNode::from_index(1), "1"),
1965 0,
1966 )
1967 .encode()
1968 .into(),
1969 FullKey::for_test(
1970 TableId::new(3),
1971 gen_key_from_str(VirtualNode::from_index(200), "1"),
1972 0,
1973 )
1974 .encode()
1975 .into(),
1976 )],
1977 sub_level_id: 101,
1978 level_type: LevelType::Overlapping,
1979 total_file_size: 100,
1980 ..Default::default()
1981 });
1982
1983 left_levels.l0.sub_levels.push(Level {
1984 level_idx: 0,
1985 table_infos: vec![gen_sst_info(
1986 3,
1987 vec![3],
1988 FullKey::for_test(
1989 TableId::new(3),
1990 gen_key_from_str(VirtualNode::from_index(1), "1"),
1991 0,
1992 )
1993 .encode()
1994 .into(),
1995 FullKey::for_test(
1996 TableId::new(3),
1997 gen_key_from_str(VirtualNode::from_index(200), "1"),
1998 0,
1999 )
2000 .encode()
2001 .into(),
2002 )],
2003 sub_level_id: 103,
2004 level_type: LevelType::Overlapping,
2005 total_file_size: 100,
2006 ..Default::default()
2007 });
2008
2009 left_levels.l0.sub_levels.push(Level {
2010 level_idx: 0,
2011 table_infos: vec![gen_sst_info(
2012 3,
2013 vec![3],
2014 FullKey::for_test(
2015 TableId::new(3),
2016 gen_key_from_str(VirtualNode::from_index(1), "1"),
2017 0,
2018 )
2019 .encode()
2020 .into(),
2021 FullKey::for_test(
2022 TableId::new(3),
2023 gen_key_from_str(VirtualNode::from_index(200), "1"),
2024 0,
2025 )
2026 .encode()
2027 .into(),
2028 )],
2029 sub_level_id: 105,
2030 level_type: LevelType::Nonoverlapping,
2031 total_file_size: 100,
2032 ..Default::default()
2033 });
2034
2035 right_levels.levels[0] = Level {
2036 level_idx: 1,
2037 level_type: LevelType::Nonoverlapping,
2038 table_infos: vec![
2039 gen_sst_info(
2040 1,
2041 vec![5],
2042 FullKey::for_test(
2043 TableId::new(5),
2044 gen_key_from_str(VirtualNode::from_index(1), "1"),
2045 0,
2046 )
2047 .encode()
2048 .into(),
2049 FullKey::for_test(
2050 TableId::new(5),
2051 gen_key_from_str(VirtualNode::from_index(200), "1"),
2052 0,
2053 )
2054 .encode()
2055 .into(),
2056 ),
2057 gen_sst_info(
2058 10,
2059 vec![5, 6],
2060 FullKey::for_test(
2061 TableId::new(5),
2062 gen_key_from_str(VirtualNode::from_index(201), "1"),
2063 0,
2064 )
2065 .encode()
2066 .into(),
2067 FullKey::for_test(
2068 TableId::new(6),
2069 gen_key_from_str(VirtualNode::from_index(10), "1"),
2070 0,
2071 )
2072 .encode()
2073 .into(),
2074 ),
2075 gen_sst_info(
2076 11,
2077 vec![6],
2078 FullKey::for_test(
2079 TableId::new(6),
2080 gen_key_from_str(VirtualNode::from_index(11), "1"),
2081 0,
2082 )
2083 .encode()
2084 .into(),
2085 FullKey::for_test(
2086 TableId::new(6),
2087 gen_key_from_str(VirtualNode::from_index(200), "1"),
2088 0,
2089 )
2090 .encode()
2091 .into(),
2092 ),
2093 ],
2094 total_file_size: 300,
2095 ..Default::default()
2096 };
2097
2098 right_levels.l0.sub_levels.push(Level {
2099 level_idx: 0,
2100 table_infos: vec![gen_sst_info(
2101 3,
2102 vec![5],
2103 FullKey::for_test(
2104 TableId::new(5),
2105 gen_key_from_str(VirtualNode::from_index(1), "1"),
2106 0,
2107 )
2108 .encode()
2109 .into(),
2110 FullKey::for_test(
2111 TableId::new(5),
2112 gen_key_from_str(VirtualNode::from_index(200), "1"),
2113 0,
2114 )
2115 .encode()
2116 .into(),
2117 )],
2118 sub_level_id: 101,
2119 level_type: LevelType::Overlapping,
2120 total_file_size: 100,
2121 ..Default::default()
2122 });
2123
2124 right_levels.l0.sub_levels.push(Level {
2125 level_idx: 0,
2126 table_infos: vec![gen_sst_info(
2127 5,
2128 vec![5],
2129 FullKey::for_test(
2130 TableId::new(5),
2131 gen_key_from_str(VirtualNode::from_index(1), "1"),
2132 0,
2133 )
2134 .encode()
2135 .into(),
2136 FullKey::for_test(
2137 TableId::new(5),
2138 gen_key_from_str(VirtualNode::from_index(200), "1"),
2139 0,
2140 )
2141 .encode()
2142 .into(),
2143 )],
2144 sub_level_id: 102,
2145 level_type: LevelType::Overlapping,
2146 total_file_size: 100,
2147 ..Default::default()
2148 });
2149
2150 right_levels.l0.sub_levels.push(Level {
2151 level_idx: 0,
2152 table_infos: vec![gen_sst_info(
2153 3,
2154 vec![5],
2155 FullKey::for_test(
2156 TableId::new(5),
2157 gen_key_from_str(VirtualNode::from_index(1), "1"),
2158 0,
2159 )
2160 .encode()
2161 .into(),
2162 FullKey::for_test(
2163 TableId::new(5),
2164 gen_key_from_str(VirtualNode::from_index(200), "1"),
2165 0,
2166 )
2167 .encode()
2168 .into(),
2169 )],
2170 sub_level_id: 103,
2171 level_type: LevelType::Nonoverlapping,
2172 total_file_size: 100,
2173 ..Default::default()
2174 });
2175
2176 {
2177 let mut left_levels = Levels::default();
2179 let right_levels = Levels::default();
2180
2181 group_split::merge_levels(&mut left_levels, right_levels);
2182 }
2183
2184 {
2185 let mut left_levels = build_initial_compaction_group_levels(
2187 1,
2188 &CompactionConfig {
2189 max_level: 6,
2190 ..Default::default()
2191 },
2192 );
2193 let right_levels = right_levels.clone();
2194
2195 group_split::merge_levels(&mut left_levels, right_levels);
2196
2197 assert!(left_levels.l0.sub_levels.len() == 3);
2198 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2199 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2200 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2201 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2202 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2203 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2204
2205 assert!(left_levels.levels[0].level_idx == 1);
2206 assert_eq!(300, left_levels.levels[0].total_file_size);
2207 }
2208
2209 {
2210 let mut left_levels = left_levels.clone();
2212 let right_levels = build_initial_compaction_group_levels(
2213 2,
2214 &CompactionConfig {
2215 max_level: 6,
2216 ..Default::default()
2217 },
2218 );
2219
2220 group_split::merge_levels(&mut left_levels, right_levels);
2221
2222 assert!(left_levels.l0.sub_levels.len() == 3);
2223 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2224 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2225 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2226 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2227 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2228 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2229
2230 assert!(left_levels.levels[0].level_idx == 1);
2231 assert_eq!(300, left_levels.levels[0].total_file_size);
2232 }
2233
2234 {
2235 let mut left_levels = left_levels.clone();
2236 let right_levels = right_levels.clone();
2237
2238 group_split::merge_levels(&mut left_levels, right_levels);
2239
2240 assert!(left_levels.l0.sub_levels.len() == 6);
2241 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2242 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2243 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2244 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2245 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2246 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2247 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2248 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2249 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2250 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2251 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2252 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2253
2254 assert!(left_levels.levels[0].level_idx == 1);
2255 assert_eq!(600, left_levels.levels[0].total_file_size);
2256 }
2257 }
2258
2259 #[test]
2260 fn test_get_split_pos() {
2261 let epoch = test_epoch(1);
2262 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2263 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2264 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2265
2266 let ssts = vec![s1, s2, s3];
2267 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2268
2269 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2270 assert_eq!(1, pos);
2271
2272 let pos = group_split::get_split_pos(&vec![], split_key);
2273 assert_eq!(0, pos);
2274 }
2275
2276 #[test]
2277 fn test_split_sst() {
2278 let epoch = test_epoch(1);
2279 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2280
2281 {
2282 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2283 let origin_sst = sst.clone();
2284 let sst_size = origin_sst.sst_size;
2285 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2286 assert_eq!(SstSplitType::Both, split_type);
2287
2288 let mut new_sst_id = 10.into();
2289 let (origin_sst, branched_sst) = group_split::split_sst(
2290 origin_sst,
2291 &mut new_sst_id,
2292 split_key,
2293 sst_size / 2,
2294 sst_size / 2,
2295 );
2296
2297 let origin_sst = origin_sst.unwrap();
2298 let branched_sst = branched_sst.unwrap();
2299
2300 assert!(origin_sst.key_range.right_exclusive);
2301 assert!(
2302 origin_sst
2303 .key_range
2304 .right
2305 .cmp(&branched_sst.key_range.left)
2306 .is_le()
2307 );
2308 assert!(origin_sst.table_ids.is_sorted());
2309 assert!(branched_sst.table_ids.is_sorted());
2310 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2311 assert!(branched_sst.sst_size < origin_sst.file_size);
2312 assert_eq!(10, branched_sst.sst_id);
2313 assert_eq!(11, origin_sst.sst_id);
2314 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2316
2317 {
2318 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2320 let origin_sst = sst.clone();
2321 let sst_size = origin_sst.sst_size;
2322 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2323 assert_eq!(SstSplitType::Both, split_type);
2324
2325 let mut new_sst_id = 10.into();
2326 let (origin_sst, branched_sst) = group_split::split_sst(
2327 origin_sst,
2328 &mut new_sst_id,
2329 split_key,
2330 sst_size / 2,
2331 sst_size / 2,
2332 );
2333
2334 let origin_sst = origin_sst.unwrap();
2335 let branched_sst = branched_sst.unwrap();
2336
2337 assert!(origin_sst.key_range.right_exclusive);
2338 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2339 assert!(origin_sst.table_ids.is_sorted());
2340 assert!(branched_sst.table_ids.is_sorted());
2341 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2342 assert!(branched_sst.sst_size < origin_sst.file_size);
2343 assert_eq!(10, branched_sst.sst_id);
2344 assert_eq!(11, origin_sst.sst_id);
2345 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2347
2348 {
2349 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2350 let split_type = group_split::need_to_split(&sst, split_key);
2351 assert_eq!(SstSplitType::Left, split_type);
2352 }
2353
2354 {
2355 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2356 let origin_sst = sst.clone();
2357 let split_type = group_split::need_to_split(&origin_sst, split_key);
2358 assert_eq!(SstSplitType::Both, split_type);
2359
2360 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2361 let origin_sst = sst;
2362 let split_type = group_split::need_to_split(&origin_sst, split_key);
2363 assert_eq!(SstSplitType::Right, split_type);
2364 }
2365
2366 {
2367 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2369 sst.key_range.right = sst.key_range.left.clone();
2370 let sst: SstableInfo = sst.into();
2371 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2372 let origin_sst = sst;
2373 let sst_size = origin_sst.sst_size;
2374
2375 let mut new_sst_id = 10.into();
2376 let (origin_sst, branched_sst) = group_split::split_sst(
2377 origin_sst,
2378 &mut new_sst_id,
2379 split_key,
2380 sst_size / 2,
2381 sst_size / 2,
2382 );
2383
2384 assert!(origin_sst.is_none());
2385 assert!(branched_sst.is_some());
2386 }
2387 }
2388
2389 #[test]
2390 fn test_split_sst_info_for_level() {
2391 let mut version = HummockVersion {
2392 id: HummockVersionId(0),
2393 levels: HashMap::from_iter([(
2394 1,
2395 build_initial_compaction_group_levels(
2396 1,
2397 &CompactionConfig {
2398 max_level: 6,
2399 ..Default::default()
2400 },
2401 ),
2402 )]),
2403 ..Default::default()
2404 };
2405
2406 let cg1 = version.levels.get_mut(&1).unwrap();
2407
2408 cg1.levels[0] = Level {
2409 level_idx: 1,
2410 level_type: LevelType::Nonoverlapping,
2411 table_infos: vec![
2412 gen_sst_info(
2413 1,
2414 vec![3],
2415 FullKey::for_test(
2416 TableId::new(3),
2417 gen_key_from_str(VirtualNode::from_index(1), "1"),
2418 0,
2419 )
2420 .encode()
2421 .into(),
2422 FullKey::for_test(
2423 TableId::new(3),
2424 gen_key_from_str(VirtualNode::from_index(200), "1"),
2425 0,
2426 )
2427 .encode()
2428 .into(),
2429 ),
2430 gen_sst_info(
2431 10,
2432 vec![3, 4],
2433 FullKey::for_test(
2434 TableId::new(3),
2435 gen_key_from_str(VirtualNode::from_index(201), "1"),
2436 0,
2437 )
2438 .encode()
2439 .into(),
2440 FullKey::for_test(
2441 TableId::new(4),
2442 gen_key_from_str(VirtualNode::from_index(10), "1"),
2443 0,
2444 )
2445 .encode()
2446 .into(),
2447 ),
2448 gen_sst_info(
2449 11,
2450 vec![4],
2451 FullKey::for_test(
2452 TableId::new(4),
2453 gen_key_from_str(VirtualNode::from_index(11), "1"),
2454 0,
2455 )
2456 .encode()
2457 .into(),
2458 FullKey::for_test(
2459 TableId::new(4),
2460 gen_key_from_str(VirtualNode::from_index(200), "1"),
2461 0,
2462 )
2463 .encode()
2464 .into(),
2465 ),
2466 ],
2467 total_file_size: 300,
2468 ..Default::default()
2469 };
2470
2471 cg1.l0.sub_levels.push(Level {
2472 level_idx: 0,
2473 table_infos: vec![
2474 gen_sst_info(
2475 2,
2476 vec![2],
2477 FullKey::for_test(
2478 TableId::new(0),
2479 gen_key_from_str(VirtualNode::from_index(1), "1"),
2480 0,
2481 )
2482 .encode()
2483 .into(),
2484 FullKey::for_test(
2485 TableId::new(2),
2486 gen_key_from_str(VirtualNode::from_index(200), "1"),
2487 0,
2488 )
2489 .encode()
2490 .into(),
2491 ),
2492 gen_sst_info(
2493 22,
2494 vec![2],
2495 FullKey::for_test(
2496 TableId::new(0),
2497 gen_key_from_str(VirtualNode::from_index(1), "1"),
2498 0,
2499 )
2500 .encode()
2501 .into(),
2502 FullKey::for_test(
2503 TableId::new(2),
2504 gen_key_from_str(VirtualNode::from_index(200), "1"),
2505 0,
2506 )
2507 .encode()
2508 .into(),
2509 ),
2510 gen_sst_info(
2511 23,
2512 vec![2],
2513 FullKey::for_test(
2514 TableId::new(0),
2515 gen_key_from_str(VirtualNode::from_index(1), "1"),
2516 0,
2517 )
2518 .encode()
2519 .into(),
2520 FullKey::for_test(
2521 TableId::new(2),
2522 gen_key_from_str(VirtualNode::from_index(200), "1"),
2523 0,
2524 )
2525 .encode()
2526 .into(),
2527 ),
2528 gen_sst_info(
2529 24,
2530 vec![2],
2531 FullKey::for_test(
2532 TableId::new(2),
2533 gen_key_from_str(VirtualNode::from_index(1), "1"),
2534 0,
2535 )
2536 .encode()
2537 .into(),
2538 FullKey::for_test(
2539 TableId::new(2),
2540 gen_key_from_str(VirtualNode::from_index(200), "1"),
2541 0,
2542 )
2543 .encode()
2544 .into(),
2545 ),
2546 gen_sst_info(
2547 25,
2548 vec![2],
2549 FullKey::for_test(
2550 TableId::new(0),
2551 gen_key_from_str(VirtualNode::from_index(1), "1"),
2552 0,
2553 )
2554 .encode()
2555 .into(),
2556 FullKey::for_test(
2557 TableId::new(0),
2558 gen_key_from_str(VirtualNode::from_index(200), "1"),
2559 0,
2560 )
2561 .encode()
2562 .into(),
2563 ),
2564 ],
2565 sub_level_id: 101,
2566 level_type: LevelType::Overlapping,
2567 total_file_size: 300,
2568 ..Default::default()
2569 });
2570
2571 {
2572 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2574
2575 let mut new_sst_id = 100.into();
2576 let x = group_split::split_sst_info_for_level_v2(
2577 &mut cg1.l0.sub_levels[0],
2578 &mut new_sst_id,
2579 split_key,
2580 );
2581 let mut right_l0 = OverlappingLevel {
2590 sub_levels: vec![],
2591 total_file_size: 0,
2592 uncompressed_file_size: 0,
2593 };
2594
2595 right_l0.sub_levels.push(Level {
2596 level_idx: 0,
2597 table_infos: x,
2598 sub_level_id: 101,
2599 total_file_size: 100,
2600 level_type: LevelType::Overlapping,
2601 ..Default::default()
2602 });
2603
2604 let right_levels = Levels {
2605 levels: vec![],
2606 l0: right_l0,
2607 ..Default::default()
2608 };
2609
2610 merge_levels(cg1, right_levels);
2611 }
2612
2613 {
2614 let mut new_sst_id = 100.into();
2616 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2617 let x = group_split::split_sst_info_for_level_v2(
2618 &mut cg1.levels[2],
2619 &mut new_sst_id,
2620 split_key,
2621 );
2622
2623 assert!(x.is_empty());
2624 }
2625
2626 {
2627 let mut cg1 = cg1.clone();
2629 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2630
2631 let mut new_sst_id = 100.into();
2632 let x = group_split::split_sst_info_for_level_v2(
2633 &mut cg1.levels[0],
2634 &mut new_sst_id,
2635 split_key,
2636 );
2637
2638 assert_eq!(3, x.len());
2639 assert_eq!(1, x[0].sst_id);
2640 assert_eq!(100, x[0].sst_size);
2641 assert_eq!(10, x[1].sst_id);
2642 assert_eq!(100, x[1].sst_size);
2643 assert_eq!(11, x[2].sst_id);
2644 assert_eq!(100, x[2].sst_size);
2645
2646 assert_eq!(0, cg1.levels[0].table_infos.len());
2647 }
2648
2649 {
2650 let mut cg1 = cg1.clone();
2652 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2653
2654 let mut new_sst_id = 100.into();
2655 let x = group_split::split_sst_info_for_level_v2(
2656 &mut cg1.levels[0],
2657 &mut new_sst_id,
2658 split_key,
2659 );
2660
2661 assert_eq!(0, x.len());
2662 assert_eq!(3, cg1.levels[0].table_infos.len());
2663 }
2664
2665 {
2691 let mut cg1 = cg1.clone();
2693 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2694
2695 let mut new_sst_id = 100.into();
2696 let x = group_split::split_sst_info_for_level_v2(
2697 &mut cg1.levels[0],
2698 &mut new_sst_id,
2699 split_key,
2700 );
2701
2702 assert_eq!(2, x.len());
2703 assert_eq!(100, x[0].sst_id);
2704 assert_eq!(100 / 2, x[0].sst_size);
2705 assert_eq!(11, x[1].sst_id);
2706 assert_eq!(100, x[1].sst_size);
2707 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2708
2709 assert_eq!(2, cg1.levels[0].table_infos.len());
2710 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2711 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2712 assert_eq!(
2713 vec![TableId::new(3)],
2714 cg1.levels[0].table_infos[1].table_ids
2715 );
2716 }
2717 }
2718}