1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51 pub insert_sst_level: u32,
52 pub insert_sst_infos: Vec<SstableInfo>,
53 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60 self.levels
61 .get(&compaction_group_id)
62 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63 }
64
65 pub fn get_compaction_group_levels_mut(
66 &mut self,
67 compaction_group_id: CompactionGroupId,
68 ) -> &mut Levels {
69 self.levels
70 .get_mut(&compaction_group_id)
71 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72 }
73
74 pub fn get_sst_ids_by_group_id(
76 &self,
77 compaction_group_id: CompactionGroupId,
78 ) -> impl Iterator<Item = HummockSstableId> + '_ {
79 self.levels
80 .iter()
81 .filter_map(move |(cg_id, level)| {
82 if *cg_id == compaction_group_id {
83 Some(level)
84 } else {
85 None
86 }
87 })
88 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89 .flat_map(|level| level.table_infos.iter())
90 .map(|s| s.sst_id)
91 }
92
93 pub fn level_iter<F: FnMut(&Level) -> bool>(
94 &self,
95 compaction_group_id: CompactionGroupId,
96 mut f: F,
97 ) {
98 if let Some(levels) = self.levels.get(&compaction_group_id) {
99 for sub_level in &levels.l0.sub_levels {
100 if !f(sub_level) {
101 return;
102 }
103 }
104 for level in &levels.levels {
105 if !f(level) {
106 return;
107 }
108 }
109 }
110 }
111
112 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113 self.levels
115 .get(&compaction_group_id)
116 .map(|group| group.levels.len() + 1)
117 .unwrap_or(0)
118 }
119
120 pub fn safe_epoch_table_watermarks(
121 &self,
122 existing_table_ids: &[u32],
123 ) -> BTreeMap<u32, TableWatermarks> {
124 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125 }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130 existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132 fn extract_single_table_watermark(
133 table_watermarks: &TableWatermarks,
134 ) -> Option<TableWatermarks> {
135 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136 Some(TableWatermarks {
137 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138 direction: table_watermarks.direction,
139 watermark_type: table_watermarks.watermark_type,
140 })
141 } else {
142 None
143 }
144 }
145 table_watermarks
146 .iter()
147 .filter_map(|(table_id, table_watermarks)| {
148 let u32_table_id = table_id.table_id();
149 if !existing_table_ids.contains(&u32_table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (table_id.table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 TableId::from(table_id),
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level
296 .table_infos
297 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298 .for_each(|sst_info| {
299 sub_level.total_file_size -= sst_info.sst_size;
300 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301 l0.total_file_size -= sst_info.sst_size;
302 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303 });
304 if insert_table_infos.is_empty() {
305 continue;
306 }
307 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308 Ok(idx) => {
309 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310 }
311 Err(idx) => {
312 insert_new_sub_level(
313 target_l0,
314 sub_level.sub_level_id,
315 sub_level.level_type,
316 insert_table_infos,
317 Some(idx),
318 );
319 }
320 }
321 }
322
323 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324 }
325 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326 let insert_table_infos =
327 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328 cur_levels.levels[idx].total_file_size += insert_table_infos
329 .iter()
330 .map(|sst| sst.sst_size)
331 .sum::<u64>();
332 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333 .iter()
334 .map(|sst| sst.uncompressed_file_size)
335 .sum::<u64>();
336 cur_levels.levels[idx]
337 .table_infos
338 .extend(insert_table_infos);
339 cur_levels.levels[idx]
340 .table_infos
341 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342 assert!(can_concat(&cur_levels.levels[idx].table_infos));
343 level
344 .table_infos
345 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346 .for_each(|sst_info| {
347 level.total_file_size -= sst_info.sst_size;
348 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349 });
350 }
351
352 assert!(
353 parent_levels
354 .l0
355 .sub_levels
356 .iter()
357 .all(|level| !level.table_infos.is_empty())
358 );
359 assert!(
360 cur_levels
361 .l0
362 .sub_levels
363 .iter()
364 .all(|level| !level.table_infos.is_empty())
365 );
366 }
367
368 pub fn build_sst_delta_infos(
369 &self,
370 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371 ) -> Vec<SstDeltaInfo> {
372 let mut infos = vec![];
373
374 if version_delta.trivial_move {
377 return infos;
378 }
379
380 for (group_id, group_deltas) in &version_delta.group_deltas {
381 let mut info = SstDeltaInfo::default();
382
383 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386 if !group_deltas.group_deltas.iter().all(|delta| {
388 matches!(
389 delta,
390 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391 )
392 }) {
393 continue;
394 }
395
396 for group_delta in &group_deltas.group_deltas {
400 match group_delta {
401 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402 if !inserted_table_infos.is_empty() {
403 info.insert_sst_level = 0;
404 info.insert_sst_infos
405 .extend(inserted_table_infos.iter().cloned());
406 }
407 }
408 GroupDeltaCommon::IntraLevel(intra_level) => {
409 if !intra_level.inserted_table_infos.is_empty() {
410 info.insert_sst_level = intra_level.level_idx;
411 info.insert_sst_infos
412 .extend(intra_level.inserted_table_infos.iter().cloned());
413 }
414 if !intra_level.removed_table_ids.is_empty() {
415 for id in &intra_level.removed_table_ids {
416 if intra_level.level_idx == 0 {
417 removed_l0_ssts.insert(*id);
418 } else {
419 removed_ssts
420 .entry(intra_level.level_idx)
421 .or_default()
422 .insert(*id);
423 }
424 }
425 }
426 }
427 GroupDeltaCommon::GroupConstruct(_)
428 | GroupDeltaCommon::GroupDestroy(_)
429 | GroupDeltaCommon::GroupMerge(_) => {}
430 }
431 }
432
433 let group = self.levels.get(group_id).unwrap();
434 for l0_sub_level in &group.level0().sub_levels {
435 for sst_info in &l0_sub_level.table_infos {
436 if removed_l0_ssts.remove(&sst_info.sst_id) {
437 info.delete_sst_object_ids.push(sst_info.object_id);
438 }
439 }
440 }
441 for level in &group.levels {
442 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443 for sst_info in &level.table_infos {
444 if removed_level_ssts.remove(&sst_info.sst_id) {
445 info.delete_sst_object_ids.push(sst_info.object_id);
446 }
447 }
448 if !removed_level_ssts.is_empty() {
449 tracing::error!(
450 "removed_level_ssts is not empty: {:?}",
451 removed_level_ssts,
452 );
453 }
454 debug_assert!(removed_level_ssts.is_empty());
455 }
456 }
457
458 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459 tracing::error!(
460 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461 removed_l0_ssts,
462 removed_ssts
463 );
464 }
465 debug_assert!(removed_l0_ssts.is_empty());
466 debug_assert!(removed_ssts.is_empty());
467
468 infos.push(info);
469 }
470
471 infos
472 }
473
474 pub fn apply_version_delta(
475 &mut self,
476 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477 ) {
478 assert_eq!(self.id, version_delta.prev_id);
479
480 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481 &version_delta.state_table_info_delta,
482 &version_delta.removed_table_ids,
483 );
484
485 #[expect(deprecated)]
486 {
487 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488 is_commit_epoch = true;
489 tracing::trace!(
490 "max committed epoch bumped but no table committed epoch is changed"
491 );
492 }
493 }
494
495 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497 let mut is_applied_l0_compact = false;
498 for group_delta in &group_deltas.group_deltas {
499 match group_delta {
500 GroupDeltaCommon::GroupConstruct(group_construct) => {
501 let mut new_levels = build_initial_compaction_group_levels(
502 *compaction_group_id,
503 group_construct.get_group_config().unwrap(),
504 );
505 let parent_group_id = group_construct.parent_group_id;
506 new_levels.parent_group_id = parent_group_id;
507 #[expect(deprecated)]
508 new_levels
510 .member_table_ids
511 .clone_from(&group_construct.table_ids);
512 self.levels.insert(*compaction_group_id, new_levels);
513 let member_table_ids = if group_construct.version()
514 >= CompatibilityVersion::NoMemberTableIds
515 {
516 self.state_table_info
517 .compaction_group_member_table_ids(*compaction_group_id)
518 .iter()
519 .map(|table_id| table_id.table_id)
520 .collect()
521 } else {
522 #[expect(deprecated)]
523 BTreeSet::from_iter(group_construct.table_ids.clone())
525 };
526
527 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
528 let split_key = if group_construct.split_key.is_some() {
529 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
530 } else {
531 None
532 };
533 self.init_with_parent_group_v2(
534 parent_group_id,
535 *compaction_group_id,
536 group_construct.get_new_sst_start_id().into(),
537 split_key.clone(),
538 );
539 } else {
540 self.init_with_parent_group(
542 parent_group_id,
543 *compaction_group_id,
544 member_table_ids,
545 group_construct.get_new_sst_start_id().into(),
546 );
547 }
548 }
549 GroupDeltaCommon::GroupMerge(group_merge) => {
550 tracing::info!(
551 "group_merge left {:?} right {:?}",
552 group_merge.left_group_id,
553 group_merge.right_group_id
554 );
555 self.merge_compaction_group(
556 group_merge.left_group_id,
557 group_merge.right_group_id,
558 )
559 }
560 GroupDeltaCommon::IntraLevel(level_delta) => {
561 let levels =
562 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
563 panic!("compaction group {} does not exist", compaction_group_id)
564 });
565 if is_commit_epoch {
566 assert!(
567 level_delta.removed_table_ids.is_empty(),
568 "no sst should be deleted when committing an epoch"
569 );
570
571 let IntraLevelDelta {
572 level_idx,
573 l0_sub_level_id,
574 inserted_table_infos,
575 ..
576 } = level_delta;
577 {
578 assert_eq!(
579 *level_idx, 0,
580 "we should only add to L0 when we commit an epoch."
581 );
582 if !inserted_table_infos.is_empty() {
583 insert_new_sub_level(
584 &mut levels.l0,
585 *l0_sub_level_id,
586 PbLevelType::Overlapping,
587 inserted_table_infos.clone(),
588 None,
589 );
590 }
591 }
592 } else {
593 levels.apply_compact_ssts(
595 level_delta,
596 self.state_table_info
597 .compaction_group_member_table_ids(*compaction_group_id),
598 );
599 if level_delta.level_idx == 0 {
600 is_applied_l0_compact = true;
601 }
602 }
603 }
604 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
605 let levels =
606 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
607 panic!("compaction group {} does not exist", compaction_group_id)
608 });
609 assert!(is_commit_epoch);
610
611 if !inserted_table_infos.is_empty() {
612 let next_l0_sub_level_id = levels
613 .l0
614 .sub_levels
615 .last()
616 .map(|level| level.sub_level_id + 1)
617 .unwrap_or(1);
618
619 insert_new_sub_level(
620 &mut levels.l0,
621 next_l0_sub_level_id,
622 PbLevelType::Overlapping,
623 inserted_table_infos.clone(),
624 None,
625 );
626 }
627 }
628 GroupDeltaCommon::GroupDestroy(_) => {
629 self.levels.remove(compaction_group_id);
630 }
631 }
632 }
633 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
634 {
635 levels.post_apply_l0_compact();
636 }
637 }
638 self.id = version_delta.id;
639 #[expect(deprecated)]
640 {
641 self.max_committed_epoch = version_delta.max_committed_epoch;
642 }
643
644 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
648 HashMap::new();
649
650 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
652 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
653 if version_delta.removed_table_ids.contains(table_id) {
654 modified_table_watermarks.insert(*table_id, None);
655 } else {
656 let mut current_table_watermarks = (**current_table_watermarks).clone();
657 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
658 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
659 }
660 } else {
661 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
662 }
663 }
664 for (table_id, table_watermarks) in &self.table_watermarks {
665 let safe_epoch = if let Some(state_table_info) =
666 self.state_table_info.info().get(table_id)
667 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
668 && state_table_info.committed_epoch > *oldest_epoch
669 {
670 state_table_info.committed_epoch
672 } else {
673 continue;
675 };
676 let table_watermarks = modified_table_watermarks
677 .entry(*table_id)
678 .or_insert_with(|| Some((**table_watermarks).clone()));
679 if let Some(table_watermarks) = table_watermarks {
680 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
681 }
682 }
683 for (table_id, table_watermarks) in modified_table_watermarks {
685 if let Some(table_watermarks) = table_watermarks {
686 self.table_watermarks
687 .insert(table_id, Arc::new(table_watermarks));
688 } else {
689 self.table_watermarks.remove(&table_id);
690 }
691 }
692
693 Self::apply_change_log_delta(
695 &mut self.table_change_log,
696 &version_delta.change_log_delta,
697 &version_delta.removed_table_ids,
698 &version_delta.state_table_info_delta,
699 &changed_table_info,
700 );
701
702 apply_vector_index_delta(
704 &mut self.vector_indexes,
705 &version_delta.vector_index_delta,
706 &version_delta.removed_table_ids,
707 );
708 }
709
710 pub fn apply_change_log_delta<T: Clone>(
711 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
712 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
713 removed_table_ids: &HashSet<TableId>,
714 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
715 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
716 ) {
717 for (table_id, change_log_delta) in change_log_delta {
718 let new_change_log = &change_log_delta.new_log;
719 match table_change_log.entry(*table_id) {
720 Entry::Occupied(entry) => {
721 let change_log = entry.into_mut();
722 change_log.add_change_log(new_change_log.clone());
723 }
724 Entry::Vacant(entry) => {
725 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
726 }
727 };
728 }
729
730 table_change_log.retain(|table_id, _| {
734 if removed_table_ids.contains(table_id) {
735 return false;
736 }
737 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
738 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
739 } else {
741 return true;
743 }
744 let contains = change_log_delta.contains_key(table_id);
745 if !contains {
746 warn!(
747 ?table_id,
748 "table change log dropped due to no further change log at newly committed epoch",
749 );
750 }
751 contains
752 });
753
754 for (table_id, change_log_delta) in change_log_delta {
756 if let Some(change_log) = table_change_log.get_mut(table_id) {
757 change_log.truncate(change_log_delta.truncate_epoch);
758 }
759 }
760 }
761
762 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
763 let mut ret: BTreeMap<_, _> = BTreeMap::new();
764 for (compaction_group_id, group) in &self.levels {
765 let mut levels = vec![];
766 levels.extend(group.l0.sub_levels.iter());
767 levels.extend(group.levels.iter());
768 for level in levels {
769 for table_info in &level.table_infos {
770 if table_info.sst_id.inner() == table_info.object_id.inner() {
771 continue;
772 }
773 let object_id = table_info.object_id;
774 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
775 entry
776 .entry(*compaction_group_id)
777 .or_default()
778 .push(table_info.sst_id)
779 }
780 }
781 }
782 ret
783 }
784
785 pub fn merge_compaction_group(
786 &mut self,
787 left_group_id: CompactionGroupId,
788 right_group_id: CompactionGroupId,
789 ) {
790 let left_group_id_table_ids = self
792 .state_table_info
793 .compaction_group_member_table_ids(left_group_id)
794 .iter()
795 .map(|table_id| table_id.table_id);
796 let right_group_id_table_ids = self
797 .state_table_info
798 .compaction_group_member_table_ids(right_group_id)
799 .iter()
800 .map(|table_id| table_id.table_id);
801
802 assert!(
803 left_group_id_table_ids
804 .chain(right_group_id_table_ids)
805 .is_sorted()
806 );
807
808 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
809 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
810 panic!(
811 "compaction group should exist right {} all {:?}",
812 right_group_id, total_cg
813 )
814 });
815
816 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
817 panic!(
818 "compaction group should exist left {} all {:?}",
819 left_group_id, total_cg
820 )
821 });
822
823 group_split::merge_levels(left_levels, right_levels);
824 }
825
826 pub fn init_with_parent_group_v2(
827 &mut self,
828 parent_group_id: CompactionGroupId,
829 group_id: CompactionGroupId,
830 new_sst_start_id: HummockSstableId,
831 split_key: Option<Bytes>,
832 ) {
833 let mut new_sst_id = new_sst_start_id;
834 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
835 if new_sst_start_id != 0 {
836 if cfg!(debug_assertions) {
837 panic!(
838 "non-zero sst start id {} for NewCompactionGroup",
839 new_sst_start_id
840 );
841 } else {
842 warn!(
843 %new_sst_start_id,
844 "non-zero sst start id for NewCompactionGroup"
845 );
846 }
847 }
848 return;
849 } else if !self.levels.contains_key(&parent_group_id) {
850 unreachable!(
851 "non-existing parent group id {} to init from (V2)",
852 parent_group_id
853 );
854 }
855
856 let [parent_levels, cur_levels] = self
857 .levels
858 .get_disjoint_mut([&parent_group_id, &group_id])
859 .map(|res| res.unwrap());
860 parent_levels.compaction_group_version_id += 1;
863 cur_levels.compaction_group_version_id += 1;
864
865 let l0 = &mut parent_levels.l0;
866 {
867 for sub_level in &mut l0.sub_levels {
868 let target_l0 = &mut cur_levels.l0;
869 let insert_table_infos = if let Some(split_key) = &split_key {
872 group_split::split_sst_info_for_level_v2(
873 sub_level,
874 &mut new_sst_id,
875 split_key.clone(),
876 )
877 } else {
878 vec![]
879 };
880
881 if insert_table_infos.is_empty() {
882 continue;
883 }
884
885 sub_level
886 .table_infos
887 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
888 .for_each(|sst_info| {
889 sub_level.total_file_size -= sst_info.sst_size;
890 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
891 l0.total_file_size -= sst_info.sst_size;
892 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
893 });
894 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
895 Ok(idx) => {
896 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
897 }
898 Err(idx) => {
899 insert_new_sub_level(
900 target_l0,
901 sub_level.sub_level_id,
902 sub_level.level_type,
903 insert_table_infos,
904 Some(idx),
905 );
906 }
907 }
908 }
909
910 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
911 }
912
913 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
914 let insert_table_infos = if let Some(split_key) = &split_key {
915 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
916 } else {
917 vec![]
918 };
919
920 if insert_table_infos.is_empty() {
921 continue;
922 }
923
924 cur_levels.levels[idx].total_file_size += insert_table_infos
925 .iter()
926 .map(|sst| sst.sst_size)
927 .sum::<u64>();
928 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
929 .iter()
930 .map(|sst| sst.uncompressed_file_size)
931 .sum::<u64>();
932 cur_levels.levels[idx]
933 .table_infos
934 .extend(insert_table_infos);
935 cur_levels.levels[idx]
936 .table_infos
937 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
938 assert!(can_concat(&cur_levels.levels[idx].table_infos));
939 level
940 .table_infos
941 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
942 .for_each(|sst_info| {
943 level.total_file_size -= sst_info.sst_size;
944 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
945 });
946 }
947
948 assert!(
949 parent_levels
950 .l0
951 .sub_levels
952 .iter()
953 .all(|level| !level.table_infos.is_empty())
954 );
955 assert!(
956 cur_levels
957 .l0
958 .sub_levels
959 .iter()
960 .all(|level| !level.table_infos.is_empty())
961 );
962 }
963}
964
965impl<T> HummockVersionCommon<T>
966where
967 T: SstableIdReader + ObjectIdReader,
968{
969 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
970 self.get_sst_infos(exclude_change_log)
971 .map(|s| s.object_id())
972 .collect()
973 }
974
975 pub fn get_object_ids(
976 &self,
977 exclude_change_log: bool,
978 ) -> impl Iterator<Item = HummockObjectId> + '_ {
979 match HummockObjectId::Sstable(0.into()) {
983 HummockObjectId::Sstable(_) => {}
984 HummockObjectId::VectorFile(_) => {}
985 };
986 self.get_sst_infos(exclude_change_log)
987 .map(|s| HummockObjectId::Sstable(s.object_id()))
988 .chain(
989 self.vector_indexes
990 .values()
991 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
992 )
993 }
994
995 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
996 self.get_sst_infos(exclude_change_log)
997 .map(|s| s.sst_id())
998 .collect()
999 }
1000
1001 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1002 let may_table_change_log = if exclude_change_log {
1003 None
1004 } else {
1005 Some(self.table_change_log.values())
1006 };
1007 self.get_combined_levels()
1008 .flat_map(|level| level.table_infos.iter())
1009 .chain(
1010 may_table_change_log
1011 .map(|v| {
1012 v.flat_map(|table_change_log| {
1013 table_change_log.iter().flat_map(|epoch_change_log| {
1014 epoch_change_log
1015 .old_value
1016 .iter()
1017 .chain(epoch_change_log.new_value.iter())
1018 })
1019 })
1020 })
1021 .into_iter()
1022 .flatten(),
1023 )
1024 }
1025}
1026
1027impl Levels {
1028 pub(crate) fn apply_compact_ssts(
1029 &mut self,
1030 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1031 member_table_ids: &BTreeSet<TableId>,
1032 ) {
1033 let IntraLevelDeltaCommon {
1034 level_idx,
1035 l0_sub_level_id,
1036 inserted_table_infos: insert_table_infos,
1037 vnode_partition_count,
1038 removed_table_ids: delete_sst_ids_set,
1039 compaction_group_version_id,
1040 } = level_delta;
1041 let new_vnode_partition_count = *vnode_partition_count;
1042
1043 if is_compaction_task_expired(
1044 self.compaction_group_version_id,
1045 *compaction_group_version_id,
1046 ) {
1047 warn!(
1048 current_compaction_group_version_id = self.compaction_group_version_id,
1049 delta_compaction_group_version_id = compaction_group_version_id,
1050 level_idx,
1051 l0_sub_level_id,
1052 insert_table_infos = ?insert_table_infos
1053 .iter()
1054 .map(|sst| (sst.sst_id, sst.object_id))
1055 .collect_vec(),
1056 ?delete_sst_ids_set,
1057 "This VersionDelta may be committed by an expired compact task. Please check it."
1058 );
1059 return;
1060 }
1061 if !delete_sst_ids_set.is_empty() {
1062 if *level_idx == 0 {
1063 for level in &mut self.l0.sub_levels {
1064 level_delete_ssts(level, delete_sst_ids_set);
1065 }
1066 } else {
1067 let idx = *level_idx as usize - 1;
1068 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1069 }
1070 }
1071
1072 if !insert_table_infos.is_empty() {
1073 let insert_sst_level_id = *level_idx;
1074 let insert_sub_level_id = *l0_sub_level_id;
1075 if insert_sst_level_id == 0 {
1076 let l0 = &mut self.l0;
1077 let index = l0
1078 .sub_levels
1079 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1080 assert!(
1081 index < l0.sub_levels.len()
1082 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1083 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1084 insert_sub_level_id,
1085 delete_sst_ids_set,
1086 l0.sub_levels
1087 .iter()
1088 .map(|level| level.sub_level_id)
1089 .collect_vec()
1090 );
1091 if l0.sub_levels[index].table_infos.is_empty()
1092 && member_table_ids.len() == 1
1093 && insert_table_infos.iter().all(|sst| {
1094 sst.table_ids.len() == 1
1095 && sst.table_ids[0]
1096 == member_table_ids.iter().next().expect("non-empty").table_id
1097 })
1098 {
1099 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1102 }
1103 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1104 } else {
1105 let idx = insert_sst_level_id as usize - 1;
1106 if self.levels[idx].table_infos.is_empty()
1107 && insert_table_infos
1108 .iter()
1109 .all(|sst| sst.table_ids.len() == 1)
1110 {
1111 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1112 } else if self.levels[idx].vnode_partition_count != 0
1113 && new_vnode_partition_count == 0
1114 && member_table_ids.len() > 1
1115 {
1116 self.levels[idx].vnode_partition_count = 0;
1117 }
1118 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1119 }
1120 }
1121 }
1122
1123 pub(crate) fn post_apply_l0_compact(&mut self) {
1124 {
1125 self.l0
1126 .sub_levels
1127 .retain(|level| !level.table_infos.is_empty());
1128 self.l0.total_file_size = self
1129 .l0
1130 .sub_levels
1131 .iter()
1132 .map(|level| level.total_file_size)
1133 .sum::<u64>();
1134 self.l0.uncompressed_file_size = self
1135 .l0
1136 .sub_levels
1137 .iter()
1138 .map(|level| level.uncompressed_file_size)
1139 .sum::<u64>();
1140 }
1141 }
1142}
1143
1144impl<T, L> HummockVersionCommon<T, L> {
1145 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1146 self.levels
1147 .values()
1148 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1149 }
1150}
1151
1152pub fn build_initial_compaction_group_levels(
1153 group_id: CompactionGroupId,
1154 compaction_config: &CompactionConfig,
1155) -> Levels {
1156 let mut levels = vec![];
1157 for l in 0..compaction_config.get_max_level() {
1158 levels.push(Level {
1159 level_idx: (l + 1) as u32,
1160 level_type: PbLevelType::Nonoverlapping,
1161 table_infos: vec![],
1162 total_file_size: 0,
1163 sub_level_id: 0,
1164 uncompressed_file_size: 0,
1165 vnode_partition_count: 0,
1166 });
1167 }
1168 #[expect(deprecated)] Levels {
1170 levels,
1171 l0: OverlappingLevel {
1172 sub_levels: vec![],
1173 total_file_size: 0,
1174 uncompressed_file_size: 0,
1175 },
1176 group_id,
1177 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1178 member_table_ids: vec![],
1179 compaction_group_version_id: 0,
1180 }
1181}
1182
1183fn split_sst_info_for_level(
1184 member_table_ids: &BTreeSet<u32>,
1185 level: &mut Level,
1186 new_sst_id: &mut HummockSstableId,
1187) -> Vec<SstableInfo> {
1188 let mut insert_table_infos = vec![];
1191 for sst_info in &mut level.table_infos {
1192 let removed_table_ids = sst_info
1193 .table_ids
1194 .iter()
1195 .filter(|table_id| member_table_ids.contains(table_id))
1196 .cloned()
1197 .collect_vec();
1198 let sst_size = sst_info.sst_size;
1199 if sst_size / 2 == 0 {
1200 tracing::warn!(
1201 id = %sst_info.sst_id,
1202 object_id = %sst_info.object_id,
1203 sst_size = sst_info.sst_size,
1204 file_size = sst_info.file_size,
1205 "Sstable sst_size is under expected",
1206 );
1207 };
1208 if !removed_table_ids.is_empty() {
1209 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1210 sst_info,
1211 new_sst_id,
1212 sst_size / 2,
1213 sst_size / 2,
1214 member_table_ids.iter().cloned().collect_vec(),
1215 );
1216 *sst_info = modified_sst;
1217 insert_table_infos.push(branch_sst);
1218 }
1219 }
1220 insert_table_infos
1221}
1222
1223pub fn get_compaction_group_ids(
1225 version: &HummockVersion,
1226) -> impl Iterator<Item = CompactionGroupId> + '_ {
1227 version.levels.keys().cloned()
1228}
1229
1230pub fn get_table_compaction_group_id_mapping(
1231 version: &HummockVersion,
1232) -> HashMap<StateTableId, CompactionGroupId> {
1233 version
1234 .state_table_info
1235 .info()
1236 .iter()
1237 .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1238 .collect()
1239}
1240
1241pub fn get_compaction_group_ssts(
1243 version: &HummockVersion,
1244 group_id: CompactionGroupId,
1245) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1246 let group_levels = version.get_compaction_group_levels(group_id);
1247 group_levels
1248 .l0
1249 .sub_levels
1250 .iter()
1251 .rev()
1252 .chain(group_levels.levels.iter())
1253 .flat_map(|level| {
1254 level
1255 .table_infos
1256 .iter()
1257 .map(|table_info| (table_info.object_id, table_info.sst_id))
1258 })
1259}
1260
1261pub fn new_sub_level(
1262 sub_level_id: u64,
1263 level_type: PbLevelType,
1264 table_infos: Vec<SstableInfo>,
1265) -> Level {
1266 if level_type == PbLevelType::Nonoverlapping {
1267 debug_assert!(
1268 can_concat(&table_infos),
1269 "sst of non-overlapping level is not concat-able: {:?}",
1270 table_infos
1271 );
1272 }
1273 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1274 let uncompressed_file_size = table_infos
1275 .iter()
1276 .map(|table| table.uncompressed_file_size)
1277 .sum();
1278 Level {
1279 level_idx: 0,
1280 level_type,
1281 table_infos,
1282 total_file_size,
1283 sub_level_id,
1284 uncompressed_file_size,
1285 vnode_partition_count: 0,
1286 }
1287}
1288
1289pub fn add_ssts_to_sub_level(
1290 l0: &mut OverlappingLevel,
1291 sub_level_idx: usize,
1292 insert_table_infos: Vec<SstableInfo>,
1293) {
1294 insert_table_infos.iter().for_each(|sst| {
1295 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1296 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1297 l0.total_file_size += sst.sst_size;
1298 l0.uncompressed_file_size += sst.uncompressed_file_size;
1299 });
1300 l0.sub_levels[sub_level_idx]
1301 .table_infos
1302 .extend(insert_table_infos);
1303 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1304 l0.sub_levels[sub_level_idx]
1305 .table_infos
1306 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1307 assert!(
1308 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1309 "sstable ids: {:?}",
1310 l0.sub_levels[sub_level_idx]
1311 .table_infos
1312 .iter()
1313 .map(|sst| sst.sst_id)
1314 .collect_vec()
1315 );
1316 }
1317}
1318
1319pub fn insert_new_sub_level(
1321 l0: &mut OverlappingLevel,
1322 insert_sub_level_id: u64,
1323 level_type: PbLevelType,
1324 insert_table_infos: Vec<SstableInfo>,
1325 sub_level_insert_hint: Option<usize>,
1326) {
1327 if insert_sub_level_id == u64::MAX {
1328 return;
1329 }
1330 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1331 insert_pos
1332 } else {
1333 if let Some(newest_level) = l0.sub_levels.last() {
1334 assert!(
1335 newest_level.sub_level_id < insert_sub_level_id,
1336 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1337 newest_level.sub_level_id,
1338 insert_sub_level_id,
1339 l0,
1340 );
1341 }
1342 l0.sub_levels.len()
1343 };
1344 #[cfg(debug_assertions)]
1345 {
1346 if insert_pos > 0
1347 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1348 {
1349 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1350 }
1351 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1352 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1353 }
1354 }
1355 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1358 l0.total_file_size += level.total_file_size;
1359 l0.uncompressed_file_size += level.uncompressed_file_size;
1360 l0.sub_levels.insert(insert_pos, level);
1361}
1362
1363fn level_delete_ssts(
1367 operand: &mut Level,
1368 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1369) -> bool {
1370 let original_len = operand.table_infos.len();
1371 operand
1372 .table_infos
1373 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1374 operand.total_file_size = operand
1375 .table_infos
1376 .iter()
1377 .map(|table| table.sst_size)
1378 .sum::<u64>();
1379 operand.uncompressed_file_size = operand
1380 .table_infos
1381 .iter()
1382 .map(|table| table.uncompressed_file_size)
1383 .sum::<u64>();
1384 original_len != operand.table_infos.len()
1385}
1386
1387fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1388 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1389 format!(
1390 "sstable ids: {:?}",
1391 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1392 )
1393 }
1394 operand.total_file_size += insert_table_infos
1395 .iter()
1396 .map(|sst| sst.sst_size)
1397 .sum::<u64>();
1398 operand.uncompressed_file_size += insert_table_infos
1399 .iter()
1400 .map(|sst| sst.uncompressed_file_size)
1401 .sum::<u64>();
1402 if operand.level_type == PbLevelType::Overlapping {
1403 operand.level_type = PbLevelType::Nonoverlapping;
1404 operand
1405 .table_infos
1406 .extend(insert_table_infos.iter().cloned());
1407 operand
1408 .table_infos
1409 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1410 assert!(
1411 can_concat(&operand.table_infos),
1412 "{}",
1413 display_sstable_infos(&operand.table_infos)
1414 );
1415 } else if !insert_table_infos.is_empty() {
1416 let sorted_insert: Vec<_> = insert_table_infos
1417 .iter()
1418 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1419 .cloned()
1420 .collect();
1421 let first = &sorted_insert[0];
1422 let last = &sorted_insert[sorted_insert.len() - 1];
1423 let pos = operand
1424 .table_infos
1425 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1426 if pos >= operand.table_infos.len()
1427 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1428 {
1429 operand.table_infos.splice(pos..pos, sorted_insert);
1430 let validate_range = operand
1432 .table_infos
1433 .iter()
1434 .skip(pos.saturating_sub(1))
1435 .take(insert_table_infos.len() + 2)
1436 .collect_vec();
1437 assert!(
1438 can_concat(&validate_range),
1439 "{}",
1440 display_sstable_infos(&validate_range),
1441 );
1442 } else {
1443 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1446 for i in insert_table_infos {
1447 let pos = operand
1448 .table_infos
1449 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1450 operand.table_infos.insert(pos, i.clone());
1451 }
1452 assert!(
1453 can_concat(&operand.table_infos),
1454 "{}",
1455 display_sstable_infos(&operand.table_infos)
1456 );
1457 }
1458 }
1459}
1460
1461pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1462 match HummockObjectId::Sstable(0.into()) {
1466 HummockObjectId::Sstable(_) => {}
1467 HummockObjectId::VectorFile(_) => {}
1468 };
1469 version
1470 .levels
1471 .values()
1472 .flat_map(|cg| {
1473 cg.level0()
1474 .sub_levels
1475 .iter()
1476 .chain(cg.levels.iter())
1477 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1478 })
1479 .chain(version.table_change_log.values().flat_map(|c| {
1480 c.iter().flat_map(|l| {
1481 l.old_value
1482 .iter()
1483 .chain(l.new_value.iter())
1484 .map(|t| (t.object_id, t.file_size))
1485 })
1486 }))
1487 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1488 .chain(
1489 version
1490 .vector_indexes
1491 .values()
1492 .flat_map(|index| index.get_objects()),
1493 )
1494 .collect()
1495}
1496
1497pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1500 let mut res = Vec::new();
1501 for (group_id, levels) in &version.levels {
1503 if levels.group_id != *group_id {
1505 res.push(format!(
1506 "GROUP {}: inconsistent group id {} in Levels",
1507 group_id, levels.group_id
1508 ));
1509 }
1510
1511 let validate_level = |group: CompactionGroupId,
1512 expected_level_idx: u32,
1513 level: &Level,
1514 res: &mut Vec<String>| {
1515 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1516 if level.level_idx == 0 {
1517 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1518 if level.table_infos.is_empty() {
1520 res.push(format!("{}: empty level", level_identifier));
1521 }
1522 } else if level.level_type != PbLevelType::Nonoverlapping {
1523 res.push(format!(
1525 "{}: level type {:?} is not non-overlapping",
1526 level_identifier, level.level_type
1527 ));
1528 }
1529
1530 if level.level_idx != expected_level_idx {
1532 res.push(format!(
1533 "{}: mismatched level idx {}",
1534 level_identifier, expected_level_idx
1535 ));
1536 }
1537
1538 let mut prev_table_info: Option<&SstableInfo> = None;
1539 for table_info in &level.table_infos {
1540 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1542 res.push(format!(
1543 "{} SST {}: table_ids not sorted",
1544 level_identifier, table_info.object_id
1545 ));
1546 }
1547
1548 if level.level_type == PbLevelType::Nonoverlapping {
1550 if let Some(prev) = prev_table_info.take()
1551 && prev
1552 .key_range
1553 .compare_right_with(&table_info.key_range.left)
1554 != Ordering::Less
1555 {
1556 res.push(format!(
1557 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1558 level_identifier, table_info.object_id, prev, table_info
1559 ));
1560 }
1561 let _ = prev_table_info.insert(table_info);
1562 }
1563 }
1564 };
1565
1566 let l0 = &levels.l0;
1567 let mut prev_sub_level_id = u64::MAX;
1568 for sub_level in &l0.sub_levels {
1569 if sub_level.sub_level_id >= prev_sub_level_id {
1571 res.push(format!(
1572 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1573 group_id, sub_level.level_idx, prev_sub_level_id
1574 ));
1575 }
1576 prev_sub_level_id = sub_level.sub_level_id;
1577
1578 validate_level(*group_id, 0, sub_level, &mut res);
1579 }
1580
1581 for idx in 1..=levels.levels.len() {
1582 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1583 }
1584 }
1585 res
1586}
1587
1588#[cfg(test)]
1589mod tests {
1590 use std::collections::{HashMap, HashSet};
1591
1592 use bytes::Bytes;
1593 use risingwave_common::catalog::TableId;
1594 use risingwave_common::hash::VirtualNode;
1595 use risingwave_common::util::epoch::test_epoch;
1596 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1597
1598 use super::group_split;
1599 use crate::HummockVersionId;
1600 use crate::compaction_group::group_split::*;
1601 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1602 use crate::key::{FullKey, gen_key_from_str};
1603 use crate::key_range::KeyRange;
1604 use crate::level::{Level, Levels, OverlappingLevel};
1605 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1606 use crate::version::{
1607 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1608 };
1609
1610 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1611 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1612 }
1613
1614 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1615 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1616 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1617 let full_key_l = FullKey::for_test(
1618 TableId::new(*table_ids.first().unwrap()),
1619 table_key_l,
1620 epoch,
1621 )
1622 .encode();
1623 let full_key_r =
1624 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1625 .encode();
1626
1627 SstableInfoInner {
1628 sst_id: sst_id.into(),
1629 key_range: KeyRange {
1630 left: full_key_l.into(),
1631 right: full_key_r.into(),
1632 right_exclusive: false,
1633 },
1634 table_ids,
1635 object_id: sst_id.into(),
1636 min_epoch: 20,
1637 max_epoch: 20,
1638 file_size: 100,
1639 sst_size: 100,
1640 ..Default::default()
1641 }
1642 }
1643
1644 #[test]
1645 fn test_get_sst_object_ids() {
1646 let mut version = HummockVersion {
1647 id: HummockVersionId::new(0),
1648 levels: HashMap::from_iter([(
1649 0,
1650 Levels {
1651 levels: vec![],
1652 l0: OverlappingLevel {
1653 sub_levels: vec![],
1654 total_file_size: 0,
1655 uncompressed_file_size: 0,
1656 },
1657 ..Default::default()
1658 },
1659 )]),
1660 ..Default::default()
1661 };
1662 assert_eq!(version.get_object_ids(false).count(), 0);
1663
1664 version
1666 .levels
1667 .get_mut(&0)
1668 .unwrap()
1669 .l0
1670 .sub_levels
1671 .push(Level {
1672 table_infos: vec![
1673 SstableInfoInner {
1674 object_id: 11.into(),
1675 sst_id: 11.into(),
1676 ..Default::default()
1677 }
1678 .into(),
1679 ],
1680 ..Default::default()
1681 });
1682 assert_eq!(version.get_object_ids(false).count(), 1);
1683
1684 version.levels.get_mut(&0).unwrap().levels.push(Level {
1686 table_infos: vec![
1687 SstableInfoInner {
1688 object_id: 22.into(),
1689 sst_id: 22.into(),
1690 ..Default::default()
1691 }
1692 .into(),
1693 ],
1694 ..Default::default()
1695 });
1696 assert_eq!(version.get_object_ids(false).count(), 2);
1697 }
1698
1699 #[test]
1700 fn test_apply_version_delta() {
1701 let mut version = HummockVersion {
1702 id: HummockVersionId::new(0),
1703 levels: HashMap::from_iter([
1704 (
1705 0,
1706 build_initial_compaction_group_levels(
1707 0,
1708 &CompactionConfig {
1709 max_level: 6,
1710 ..Default::default()
1711 },
1712 ),
1713 ),
1714 (
1715 1,
1716 build_initial_compaction_group_levels(
1717 1,
1718 &CompactionConfig {
1719 max_level: 6,
1720 ..Default::default()
1721 },
1722 ),
1723 ),
1724 ]),
1725 ..Default::default()
1726 };
1727 let version_delta = HummockVersionDelta {
1728 id: HummockVersionId::new(1),
1729 group_deltas: HashMap::from_iter([
1730 (
1731 2,
1732 GroupDeltas {
1733 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1734 group_config: Some(CompactionConfig {
1735 max_level: 6,
1736 ..Default::default()
1737 }),
1738 ..Default::default()
1739 }))],
1740 },
1741 ),
1742 (
1743 0,
1744 GroupDeltas {
1745 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1746 },
1747 ),
1748 (
1749 1,
1750 GroupDeltas {
1751 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1752 1,
1753 0,
1754 HashSet::new(),
1755 vec![
1756 SstableInfoInner {
1757 object_id: 1.into(),
1758 sst_id: 1.into(),
1759 ..Default::default()
1760 }
1761 .into(),
1762 ],
1763 0,
1764 version
1765 .levels
1766 .get(&1)
1767 .as_ref()
1768 .unwrap()
1769 .compaction_group_version_id,
1770 ))],
1771 },
1772 ),
1773 ]),
1774 ..Default::default()
1775 };
1776 let version_delta = version_delta;
1777
1778 version.apply_version_delta(&version_delta);
1779 let mut cg1 = build_initial_compaction_group_levels(
1780 1,
1781 &CompactionConfig {
1782 max_level: 6,
1783 ..Default::default()
1784 },
1785 );
1786 cg1.levels[0] = Level {
1787 level_idx: 1,
1788 level_type: LevelType::Nonoverlapping,
1789 table_infos: vec![
1790 SstableInfoInner {
1791 object_id: 1.into(),
1792 sst_id: 1.into(),
1793 ..Default::default()
1794 }
1795 .into(),
1796 ],
1797 ..Default::default()
1798 };
1799 assert_eq!(
1800 version,
1801 HummockVersion {
1802 id: HummockVersionId::new(1),
1803 levels: HashMap::from_iter([
1804 (
1805 2,
1806 build_initial_compaction_group_levels(
1807 2,
1808 &CompactionConfig {
1809 max_level: 6,
1810 ..Default::default()
1811 },
1812 ),
1813 ),
1814 (1, cg1),
1815 ]),
1816 ..Default::default()
1817 }
1818 );
1819 }
1820
1821 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1822 gen_sst_info_impl(object_id, table_ids, left, right).into()
1823 }
1824
1825 fn gen_sst_info_impl(
1826 object_id: u64,
1827 table_ids: Vec<u32>,
1828 left: Bytes,
1829 right: Bytes,
1830 ) -> SstableInfoInner {
1831 SstableInfoInner {
1832 object_id: object_id.into(),
1833 sst_id: object_id.into(),
1834 key_range: KeyRange {
1835 left,
1836 right,
1837 right_exclusive: false,
1838 },
1839 table_ids,
1840 file_size: 100,
1841 sst_size: 100,
1842 uncompressed_file_size: 100,
1843 ..Default::default()
1844 }
1845 }
1846
1847 #[test]
1848 fn test_merge_levels() {
1849 let mut left_levels = build_initial_compaction_group_levels(
1850 1,
1851 &CompactionConfig {
1852 max_level: 6,
1853 ..Default::default()
1854 },
1855 );
1856
1857 let mut right_levels = build_initial_compaction_group_levels(
1858 2,
1859 &CompactionConfig {
1860 max_level: 6,
1861 ..Default::default()
1862 },
1863 );
1864
1865 left_levels.levels[0] = Level {
1866 level_idx: 1,
1867 level_type: LevelType::Nonoverlapping,
1868 table_infos: vec![
1869 gen_sst_info(
1870 1,
1871 vec![3],
1872 FullKey::for_test(
1873 TableId::new(3),
1874 gen_key_from_str(VirtualNode::from_index(1), "1"),
1875 0,
1876 )
1877 .encode()
1878 .into(),
1879 FullKey::for_test(
1880 TableId::new(3),
1881 gen_key_from_str(VirtualNode::from_index(200), "1"),
1882 0,
1883 )
1884 .encode()
1885 .into(),
1886 ),
1887 gen_sst_info(
1888 10,
1889 vec![3, 4],
1890 FullKey::for_test(
1891 TableId::new(3),
1892 gen_key_from_str(VirtualNode::from_index(201), "1"),
1893 0,
1894 )
1895 .encode()
1896 .into(),
1897 FullKey::for_test(
1898 TableId::new(4),
1899 gen_key_from_str(VirtualNode::from_index(10), "1"),
1900 0,
1901 )
1902 .encode()
1903 .into(),
1904 ),
1905 gen_sst_info(
1906 11,
1907 vec![4],
1908 FullKey::for_test(
1909 TableId::new(4),
1910 gen_key_from_str(VirtualNode::from_index(11), "1"),
1911 0,
1912 )
1913 .encode()
1914 .into(),
1915 FullKey::for_test(
1916 TableId::new(4),
1917 gen_key_from_str(VirtualNode::from_index(200), "1"),
1918 0,
1919 )
1920 .encode()
1921 .into(),
1922 ),
1923 ],
1924 total_file_size: 300,
1925 ..Default::default()
1926 };
1927
1928 left_levels.l0.sub_levels.push(Level {
1929 level_idx: 0,
1930 table_infos: vec![gen_sst_info(
1931 3,
1932 vec![3],
1933 FullKey::for_test(
1934 TableId::new(3),
1935 gen_key_from_str(VirtualNode::from_index(1), "1"),
1936 0,
1937 )
1938 .encode()
1939 .into(),
1940 FullKey::for_test(
1941 TableId::new(3),
1942 gen_key_from_str(VirtualNode::from_index(200), "1"),
1943 0,
1944 )
1945 .encode()
1946 .into(),
1947 )],
1948 sub_level_id: 101,
1949 level_type: LevelType::Overlapping,
1950 total_file_size: 100,
1951 ..Default::default()
1952 });
1953
1954 left_levels.l0.sub_levels.push(Level {
1955 level_idx: 0,
1956 table_infos: vec![gen_sst_info(
1957 3,
1958 vec![3],
1959 FullKey::for_test(
1960 TableId::new(3),
1961 gen_key_from_str(VirtualNode::from_index(1), "1"),
1962 0,
1963 )
1964 .encode()
1965 .into(),
1966 FullKey::for_test(
1967 TableId::new(3),
1968 gen_key_from_str(VirtualNode::from_index(200), "1"),
1969 0,
1970 )
1971 .encode()
1972 .into(),
1973 )],
1974 sub_level_id: 103,
1975 level_type: LevelType::Overlapping,
1976 total_file_size: 100,
1977 ..Default::default()
1978 });
1979
1980 left_levels.l0.sub_levels.push(Level {
1981 level_idx: 0,
1982 table_infos: vec![gen_sst_info(
1983 3,
1984 vec![3],
1985 FullKey::for_test(
1986 TableId::new(3),
1987 gen_key_from_str(VirtualNode::from_index(1), "1"),
1988 0,
1989 )
1990 .encode()
1991 .into(),
1992 FullKey::for_test(
1993 TableId::new(3),
1994 gen_key_from_str(VirtualNode::from_index(200), "1"),
1995 0,
1996 )
1997 .encode()
1998 .into(),
1999 )],
2000 sub_level_id: 105,
2001 level_type: LevelType::Nonoverlapping,
2002 total_file_size: 100,
2003 ..Default::default()
2004 });
2005
2006 right_levels.levels[0] = Level {
2007 level_idx: 1,
2008 level_type: LevelType::Nonoverlapping,
2009 table_infos: vec![
2010 gen_sst_info(
2011 1,
2012 vec![5],
2013 FullKey::for_test(
2014 TableId::new(5),
2015 gen_key_from_str(VirtualNode::from_index(1), "1"),
2016 0,
2017 )
2018 .encode()
2019 .into(),
2020 FullKey::for_test(
2021 TableId::new(5),
2022 gen_key_from_str(VirtualNode::from_index(200), "1"),
2023 0,
2024 )
2025 .encode()
2026 .into(),
2027 ),
2028 gen_sst_info(
2029 10,
2030 vec![5, 6],
2031 FullKey::for_test(
2032 TableId::new(5),
2033 gen_key_from_str(VirtualNode::from_index(201), "1"),
2034 0,
2035 )
2036 .encode()
2037 .into(),
2038 FullKey::for_test(
2039 TableId::new(6),
2040 gen_key_from_str(VirtualNode::from_index(10), "1"),
2041 0,
2042 )
2043 .encode()
2044 .into(),
2045 ),
2046 gen_sst_info(
2047 11,
2048 vec![6],
2049 FullKey::for_test(
2050 TableId::new(6),
2051 gen_key_from_str(VirtualNode::from_index(11), "1"),
2052 0,
2053 )
2054 .encode()
2055 .into(),
2056 FullKey::for_test(
2057 TableId::new(6),
2058 gen_key_from_str(VirtualNode::from_index(200), "1"),
2059 0,
2060 )
2061 .encode()
2062 .into(),
2063 ),
2064 ],
2065 total_file_size: 300,
2066 ..Default::default()
2067 };
2068
2069 right_levels.l0.sub_levels.push(Level {
2070 level_idx: 0,
2071 table_infos: vec![gen_sst_info(
2072 3,
2073 vec![5],
2074 FullKey::for_test(
2075 TableId::new(5),
2076 gen_key_from_str(VirtualNode::from_index(1), "1"),
2077 0,
2078 )
2079 .encode()
2080 .into(),
2081 FullKey::for_test(
2082 TableId::new(5),
2083 gen_key_from_str(VirtualNode::from_index(200), "1"),
2084 0,
2085 )
2086 .encode()
2087 .into(),
2088 )],
2089 sub_level_id: 101,
2090 level_type: LevelType::Overlapping,
2091 total_file_size: 100,
2092 ..Default::default()
2093 });
2094
2095 right_levels.l0.sub_levels.push(Level {
2096 level_idx: 0,
2097 table_infos: vec![gen_sst_info(
2098 5,
2099 vec![5],
2100 FullKey::for_test(
2101 TableId::new(5),
2102 gen_key_from_str(VirtualNode::from_index(1), "1"),
2103 0,
2104 )
2105 .encode()
2106 .into(),
2107 FullKey::for_test(
2108 TableId::new(5),
2109 gen_key_from_str(VirtualNode::from_index(200), "1"),
2110 0,
2111 )
2112 .encode()
2113 .into(),
2114 )],
2115 sub_level_id: 102,
2116 level_type: LevelType::Overlapping,
2117 total_file_size: 100,
2118 ..Default::default()
2119 });
2120
2121 right_levels.l0.sub_levels.push(Level {
2122 level_idx: 0,
2123 table_infos: vec![gen_sst_info(
2124 3,
2125 vec![5],
2126 FullKey::for_test(
2127 TableId::new(5),
2128 gen_key_from_str(VirtualNode::from_index(1), "1"),
2129 0,
2130 )
2131 .encode()
2132 .into(),
2133 FullKey::for_test(
2134 TableId::new(5),
2135 gen_key_from_str(VirtualNode::from_index(200), "1"),
2136 0,
2137 )
2138 .encode()
2139 .into(),
2140 )],
2141 sub_level_id: 103,
2142 level_type: LevelType::Nonoverlapping,
2143 total_file_size: 100,
2144 ..Default::default()
2145 });
2146
2147 {
2148 let mut left_levels = Levels::default();
2150 let right_levels = Levels::default();
2151
2152 group_split::merge_levels(&mut left_levels, right_levels);
2153 }
2154
2155 {
2156 let mut left_levels = build_initial_compaction_group_levels(
2158 1,
2159 &CompactionConfig {
2160 max_level: 6,
2161 ..Default::default()
2162 },
2163 );
2164 let right_levels = right_levels.clone();
2165
2166 group_split::merge_levels(&mut left_levels, right_levels);
2167
2168 assert!(left_levels.l0.sub_levels.len() == 3);
2169 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2170 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2171 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2172 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2173 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2174 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2175
2176 assert!(left_levels.levels[0].level_idx == 1);
2177 assert_eq!(300, left_levels.levels[0].total_file_size);
2178 }
2179
2180 {
2181 let mut left_levels = left_levels.clone();
2183 let right_levels = build_initial_compaction_group_levels(
2184 2,
2185 &CompactionConfig {
2186 max_level: 6,
2187 ..Default::default()
2188 },
2189 );
2190
2191 group_split::merge_levels(&mut left_levels, right_levels);
2192
2193 assert!(left_levels.l0.sub_levels.len() == 3);
2194 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2195 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2196 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2197 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2198 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2199 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2200
2201 assert!(left_levels.levels[0].level_idx == 1);
2202 assert_eq!(300, left_levels.levels[0].total_file_size);
2203 }
2204
2205 {
2206 let mut left_levels = left_levels.clone();
2207 let right_levels = right_levels.clone();
2208
2209 group_split::merge_levels(&mut left_levels, right_levels);
2210
2211 assert!(left_levels.l0.sub_levels.len() == 6);
2212 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2213 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2214 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2215 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2216 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2217 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2218 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2219 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2220 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2221 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2222 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2223 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2224
2225 assert!(left_levels.levels[0].level_idx == 1);
2226 assert_eq!(600, left_levels.levels[0].total_file_size);
2227 }
2228 }
2229
2230 #[test]
2231 fn test_get_split_pos() {
2232 let epoch = test_epoch(1);
2233 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2234 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2235 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2236
2237 let ssts = vec![s1, s2, s3];
2238 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2239
2240 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2241 assert_eq!(1, pos);
2242
2243 let pos = group_split::get_split_pos(&vec![], split_key);
2244 assert_eq!(0, pos);
2245 }
2246
2247 #[test]
2248 fn test_split_sst() {
2249 let epoch = test_epoch(1);
2250 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2251
2252 {
2253 let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2254 let origin_sst = sst.clone();
2255 let sst_size = origin_sst.sst_size;
2256 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2257 assert_eq!(SstSplitType::Both, split_type);
2258
2259 let mut new_sst_id = 10.into();
2260 let (origin_sst, branched_sst) = group_split::split_sst(
2261 origin_sst,
2262 &mut new_sst_id,
2263 split_key,
2264 sst_size / 2,
2265 sst_size / 2,
2266 );
2267
2268 let origin_sst = origin_sst.unwrap();
2269 let branched_sst = branched_sst.unwrap();
2270
2271 assert!(origin_sst.key_range.right_exclusive);
2272 assert!(
2273 origin_sst
2274 .key_range
2275 .right
2276 .cmp(&branched_sst.key_range.left)
2277 .is_le()
2278 );
2279 assert!(origin_sst.table_ids.is_sorted());
2280 assert!(branched_sst.table_ids.is_sorted());
2281 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2282 assert!(branched_sst.sst_size < origin_sst.file_size);
2283 assert_eq!(10, branched_sst.sst_id);
2284 assert_eq!(11, origin_sst.sst_id);
2285 assert_eq!(&3, branched_sst.table_ids.first().unwrap()); }
2287
2288 {
2289 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2291 let origin_sst = sst.clone();
2292 let sst_size = origin_sst.sst_size;
2293 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2294 assert_eq!(SstSplitType::Both, split_type);
2295
2296 let mut new_sst_id = 10.into();
2297 let (origin_sst, branched_sst) = group_split::split_sst(
2298 origin_sst,
2299 &mut new_sst_id,
2300 split_key,
2301 sst_size / 2,
2302 sst_size / 2,
2303 );
2304
2305 let origin_sst = origin_sst.unwrap();
2306 let branched_sst = branched_sst.unwrap();
2307
2308 assert!(origin_sst.key_range.right_exclusive);
2309 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2310 assert!(origin_sst.table_ids.is_sorted());
2311 assert!(branched_sst.table_ids.is_sorted());
2312 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2313 assert!(branched_sst.sst_size < origin_sst.file_size);
2314 assert_eq!(10, branched_sst.sst_id);
2315 assert_eq!(11, origin_sst.sst_id);
2316 assert_eq!(&5, branched_sst.table_ids.first().unwrap()); }
2318
2319 {
2320 let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2321 let origin_sst = sst.clone();
2322 let split_type = group_split::need_to_split(&origin_sst, split_key);
2323 assert_eq!(SstSplitType::Left, split_type);
2324 }
2325
2326 {
2327 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2328 let origin_sst = sst.clone();
2329 let split_type = group_split::need_to_split(&origin_sst, split_key);
2330 assert_eq!(SstSplitType::Both, split_type);
2331
2332 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2333 let origin_sst = sst.clone();
2334 let split_type = group_split::need_to_split(&origin_sst, split_key);
2335 assert_eq!(SstSplitType::Right, split_type);
2336 }
2337
2338 {
2339 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2341 sst.key_range.right = sst.key_range.left.clone();
2342 let sst: SstableInfo = sst.into();
2343 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2344 let origin_sst = sst.clone();
2345 let sst_size = origin_sst.sst_size;
2346
2347 let mut new_sst_id = 10.into();
2348 let (origin_sst, branched_sst) = group_split::split_sst(
2349 origin_sst,
2350 &mut new_sst_id,
2351 split_key,
2352 sst_size / 2,
2353 sst_size / 2,
2354 );
2355
2356 assert!(origin_sst.is_none());
2357 assert!(branched_sst.is_some());
2358 }
2359 }
2360
2361 #[test]
2362 fn test_split_sst_info_for_level() {
2363 let mut version = HummockVersion {
2364 id: HummockVersionId(0),
2365 levels: HashMap::from_iter([(
2366 1,
2367 build_initial_compaction_group_levels(
2368 1,
2369 &CompactionConfig {
2370 max_level: 6,
2371 ..Default::default()
2372 },
2373 ),
2374 )]),
2375 ..Default::default()
2376 };
2377
2378 let cg1 = version.levels.get_mut(&1).unwrap();
2379
2380 cg1.levels[0] = Level {
2381 level_idx: 1,
2382 level_type: LevelType::Nonoverlapping,
2383 table_infos: vec![
2384 gen_sst_info(
2385 1,
2386 vec![3],
2387 FullKey::for_test(
2388 TableId::new(3),
2389 gen_key_from_str(VirtualNode::from_index(1), "1"),
2390 0,
2391 )
2392 .encode()
2393 .into(),
2394 FullKey::for_test(
2395 TableId::new(3),
2396 gen_key_from_str(VirtualNode::from_index(200), "1"),
2397 0,
2398 )
2399 .encode()
2400 .into(),
2401 ),
2402 gen_sst_info(
2403 10,
2404 vec![3, 4],
2405 FullKey::for_test(
2406 TableId::new(3),
2407 gen_key_from_str(VirtualNode::from_index(201), "1"),
2408 0,
2409 )
2410 .encode()
2411 .into(),
2412 FullKey::for_test(
2413 TableId::new(4),
2414 gen_key_from_str(VirtualNode::from_index(10), "1"),
2415 0,
2416 )
2417 .encode()
2418 .into(),
2419 ),
2420 gen_sst_info(
2421 11,
2422 vec![4],
2423 FullKey::for_test(
2424 TableId::new(4),
2425 gen_key_from_str(VirtualNode::from_index(11), "1"),
2426 0,
2427 )
2428 .encode()
2429 .into(),
2430 FullKey::for_test(
2431 TableId::new(4),
2432 gen_key_from_str(VirtualNode::from_index(200), "1"),
2433 0,
2434 )
2435 .encode()
2436 .into(),
2437 ),
2438 ],
2439 total_file_size: 300,
2440 ..Default::default()
2441 };
2442
2443 cg1.l0.sub_levels.push(Level {
2444 level_idx: 0,
2445 table_infos: vec![
2446 gen_sst_info(
2447 2,
2448 vec![2],
2449 FullKey::for_test(
2450 TableId::new(0),
2451 gen_key_from_str(VirtualNode::from_index(1), "1"),
2452 0,
2453 )
2454 .encode()
2455 .into(),
2456 FullKey::for_test(
2457 TableId::new(2),
2458 gen_key_from_str(VirtualNode::from_index(200), "1"),
2459 0,
2460 )
2461 .encode()
2462 .into(),
2463 ),
2464 gen_sst_info(
2465 22,
2466 vec![2],
2467 FullKey::for_test(
2468 TableId::new(0),
2469 gen_key_from_str(VirtualNode::from_index(1), "1"),
2470 0,
2471 )
2472 .encode()
2473 .into(),
2474 FullKey::for_test(
2475 TableId::new(2),
2476 gen_key_from_str(VirtualNode::from_index(200), "1"),
2477 0,
2478 )
2479 .encode()
2480 .into(),
2481 ),
2482 gen_sst_info(
2483 23,
2484 vec![2],
2485 FullKey::for_test(
2486 TableId::new(0),
2487 gen_key_from_str(VirtualNode::from_index(1), "1"),
2488 0,
2489 )
2490 .encode()
2491 .into(),
2492 FullKey::for_test(
2493 TableId::new(2),
2494 gen_key_from_str(VirtualNode::from_index(200), "1"),
2495 0,
2496 )
2497 .encode()
2498 .into(),
2499 ),
2500 gen_sst_info(
2501 24,
2502 vec![2],
2503 FullKey::for_test(
2504 TableId::new(2),
2505 gen_key_from_str(VirtualNode::from_index(1), "1"),
2506 0,
2507 )
2508 .encode()
2509 .into(),
2510 FullKey::for_test(
2511 TableId::new(2),
2512 gen_key_from_str(VirtualNode::from_index(200), "1"),
2513 0,
2514 )
2515 .encode()
2516 .into(),
2517 ),
2518 gen_sst_info(
2519 25,
2520 vec![2],
2521 FullKey::for_test(
2522 TableId::new(0),
2523 gen_key_from_str(VirtualNode::from_index(1), "1"),
2524 0,
2525 )
2526 .encode()
2527 .into(),
2528 FullKey::for_test(
2529 TableId::new(0),
2530 gen_key_from_str(VirtualNode::from_index(200), "1"),
2531 0,
2532 )
2533 .encode()
2534 .into(),
2535 ),
2536 ],
2537 sub_level_id: 101,
2538 level_type: LevelType::Overlapping,
2539 total_file_size: 300,
2540 ..Default::default()
2541 });
2542
2543 {
2544 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2546
2547 let mut new_sst_id = 100.into();
2548 let x = group_split::split_sst_info_for_level_v2(
2549 &mut cg1.l0.sub_levels[0],
2550 &mut new_sst_id,
2551 split_key,
2552 );
2553 let mut right_l0 = OverlappingLevel {
2562 sub_levels: vec![],
2563 total_file_size: 0,
2564 uncompressed_file_size: 0,
2565 };
2566
2567 right_l0.sub_levels.push(Level {
2568 level_idx: 0,
2569 table_infos: x,
2570 sub_level_id: 101,
2571 total_file_size: 100,
2572 level_type: LevelType::Overlapping,
2573 ..Default::default()
2574 });
2575
2576 let right_levels = Levels {
2577 levels: vec![],
2578 l0: right_l0,
2579 ..Default::default()
2580 };
2581
2582 merge_levels(cg1, right_levels);
2583 }
2584
2585 {
2586 let mut new_sst_id = 100.into();
2588 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2589 let x = group_split::split_sst_info_for_level_v2(
2590 &mut cg1.levels[2],
2591 &mut new_sst_id,
2592 split_key,
2593 );
2594
2595 assert!(x.is_empty());
2596 }
2597
2598 {
2599 let mut cg1 = cg1.clone();
2601 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2602
2603 let mut new_sst_id = 100.into();
2604 let x = group_split::split_sst_info_for_level_v2(
2605 &mut cg1.levels[0],
2606 &mut new_sst_id,
2607 split_key,
2608 );
2609
2610 assert_eq!(3, x.len());
2611 assert_eq!(1, x[0].sst_id);
2612 assert_eq!(100, x[0].sst_size);
2613 assert_eq!(10, x[1].sst_id);
2614 assert_eq!(100, x[1].sst_size);
2615 assert_eq!(11, x[2].sst_id);
2616 assert_eq!(100, x[2].sst_size);
2617
2618 assert_eq!(0, cg1.levels[0].table_infos.len());
2619 }
2620
2621 {
2622 let mut cg1 = cg1.clone();
2624 let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2625
2626 let mut new_sst_id = 100.into();
2627 let x = group_split::split_sst_info_for_level_v2(
2628 &mut cg1.levels[0],
2629 &mut new_sst_id,
2630 split_key,
2631 );
2632
2633 assert_eq!(0, x.len());
2634 assert_eq!(3, cg1.levels[0].table_infos.len());
2635 }
2636
2637 {
2663 let mut cg1 = cg1.clone();
2665 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2666
2667 let mut new_sst_id = 100.into();
2668 let x = group_split::split_sst_info_for_level_v2(
2669 &mut cg1.levels[0],
2670 &mut new_sst_id,
2671 split_key,
2672 );
2673
2674 assert_eq!(2, x.len());
2675 assert_eq!(100, x[0].sst_id);
2676 assert_eq!(100 / 2, x[0].sst_size);
2677 assert_eq!(11, x[1].sst_id);
2678 assert_eq!(100, x[1].sst_size);
2679 assert_eq!(vec![4], x[1].table_ids);
2680
2681 assert_eq!(2, cg1.levels[0].table_infos.len());
2682 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2683 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2684 assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2685 }
2686 }
2687}