1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51 pub insert_sst_level: u32,
52 pub insert_sst_infos: Vec<SstableInfo>,
53 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60 self.levels
61 .get(&compaction_group_id)
62 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63 }
64
65 pub fn get_compaction_group_levels_mut(
66 &mut self,
67 compaction_group_id: CompactionGroupId,
68 ) -> &mut Levels {
69 self.levels
70 .get_mut(&compaction_group_id)
71 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72 }
73
74 pub fn get_sst_ids_by_group_id(
76 &self,
77 compaction_group_id: CompactionGroupId,
78 ) -> impl Iterator<Item = HummockSstableId> + '_ {
79 self.levels
80 .iter()
81 .filter_map(move |(cg_id, level)| {
82 if *cg_id == compaction_group_id {
83 Some(level)
84 } else {
85 None
86 }
87 })
88 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89 .flat_map(|level| level.table_infos.iter())
90 .map(|s| s.sst_id)
91 }
92
93 pub fn level_iter<F: FnMut(&Level) -> bool>(
94 &self,
95 compaction_group_id: CompactionGroupId,
96 mut f: F,
97 ) {
98 if let Some(levels) = self.levels.get(&compaction_group_id) {
99 for sub_level in &levels.l0.sub_levels {
100 if !f(sub_level) {
101 return;
102 }
103 }
104 for level in &levels.levels {
105 if !f(level) {
106 return;
107 }
108 }
109 }
110 }
111
112 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113 self.levels
115 .get(&compaction_group_id)
116 .map(|group| group.levels.len() + 1)
117 .unwrap_or(0)
118 }
119
120 pub fn safe_epoch_table_watermarks(
121 &self,
122 existing_table_ids: &[u32],
123 ) -> BTreeMap<u32, TableWatermarks> {
124 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125 }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130 existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132 fn extract_single_table_watermark(
133 table_watermarks: &TableWatermarks,
134 ) -> Option<TableWatermarks> {
135 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136 Some(TableWatermarks {
137 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138 direction: table_watermarks.direction,
139 watermark_type: table_watermarks.watermark_type,
140 })
141 } else {
142 None
143 }
144 }
145 table_watermarks
146 .iter()
147 .filter_map(|(table_id, table_watermarks)| {
148 let u32_table_id = table_id.table_id();
149 if !existing_table_ids.contains(&u32_table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (table_id.table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 TableId::from(table_id),
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level
296 .table_infos
297 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298 .for_each(|sst_info| {
299 sub_level.total_file_size -= sst_info.sst_size;
300 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301 l0.total_file_size -= sst_info.sst_size;
302 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303 });
304 if insert_table_infos.is_empty() {
305 continue;
306 }
307 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308 Ok(idx) => {
309 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310 }
311 Err(idx) => {
312 insert_new_sub_level(
313 target_l0,
314 sub_level.sub_level_id,
315 sub_level.level_type,
316 insert_table_infos,
317 Some(idx),
318 );
319 }
320 }
321 }
322
323 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324 }
325 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326 let insert_table_infos =
327 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328 cur_levels.levels[idx].total_file_size += insert_table_infos
329 .iter()
330 .map(|sst| sst.sst_size)
331 .sum::<u64>();
332 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333 .iter()
334 .map(|sst| sst.uncompressed_file_size)
335 .sum::<u64>();
336 cur_levels.levels[idx]
337 .table_infos
338 .extend(insert_table_infos);
339 cur_levels.levels[idx]
340 .table_infos
341 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342 assert!(can_concat(&cur_levels.levels[idx].table_infos));
343 level
344 .table_infos
345 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346 .for_each(|sst_info| {
347 level.total_file_size -= sst_info.sst_size;
348 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349 });
350 }
351
352 assert!(
353 parent_levels
354 .l0
355 .sub_levels
356 .iter()
357 .all(|level| !level.table_infos.is_empty())
358 );
359 assert!(
360 cur_levels
361 .l0
362 .sub_levels
363 .iter()
364 .all(|level| !level.table_infos.is_empty())
365 );
366 }
367
368 pub fn build_sst_delta_infos(
369 &self,
370 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371 ) -> Vec<SstDeltaInfo> {
372 let mut infos = vec![];
373
374 if version_delta.trivial_move {
377 return infos;
378 }
379
380 for (group_id, group_deltas) in &version_delta.group_deltas {
381 let mut info = SstDeltaInfo::default();
382
383 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386 if !group_deltas.group_deltas.iter().all(|delta| {
388 matches!(
389 delta,
390 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391 )
392 }) {
393 continue;
394 }
395
396 for group_delta in &group_deltas.group_deltas {
400 match group_delta {
401 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402 if !inserted_table_infos.is_empty() {
403 info.insert_sst_level = 0;
404 info.insert_sst_infos
405 .extend(inserted_table_infos.iter().cloned());
406 }
407 }
408 GroupDeltaCommon::IntraLevel(intra_level) => {
409 if !intra_level.inserted_table_infos.is_empty() {
410 info.insert_sst_level = intra_level.level_idx;
411 info.insert_sst_infos
412 .extend(intra_level.inserted_table_infos.iter().cloned());
413 }
414 if !intra_level.removed_table_ids.is_empty() {
415 for id in &intra_level.removed_table_ids {
416 if intra_level.level_idx == 0 {
417 removed_l0_ssts.insert(*id);
418 } else {
419 removed_ssts
420 .entry(intra_level.level_idx)
421 .or_default()
422 .insert(*id);
423 }
424 }
425 }
426 }
427 GroupDeltaCommon::GroupConstruct(_)
428 | GroupDeltaCommon::GroupDestroy(_)
429 | GroupDeltaCommon::GroupMerge(_) => {}
430 }
431 }
432
433 let group = self.levels.get(group_id).unwrap();
434 for l0_sub_level in &group.level0().sub_levels {
435 for sst_info in &l0_sub_level.table_infos {
436 if removed_l0_ssts.remove(&sst_info.sst_id) {
437 info.delete_sst_object_ids.push(sst_info.object_id);
438 }
439 }
440 }
441 for level in &group.levels {
442 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443 for sst_info in &level.table_infos {
444 if removed_level_ssts.remove(&sst_info.sst_id) {
445 info.delete_sst_object_ids.push(sst_info.object_id);
446 }
447 }
448 if !removed_level_ssts.is_empty() {
449 tracing::error!(
450 "removed_level_ssts is not empty: {:?}",
451 removed_level_ssts,
452 );
453 }
454 debug_assert!(removed_level_ssts.is_empty());
455 }
456 }
457
458 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459 tracing::error!(
460 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461 removed_l0_ssts,
462 removed_ssts
463 );
464 }
465 debug_assert!(removed_l0_ssts.is_empty());
466 debug_assert!(removed_ssts.is_empty());
467
468 infos.push(info);
469 }
470
471 infos
472 }
473
474 pub fn apply_version_delta(
475 &mut self,
476 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477 ) {
478 assert_eq!(self.id, version_delta.prev_id);
479
480 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481 &version_delta.state_table_info_delta,
482 &version_delta.removed_table_ids,
483 );
484
485 #[expect(deprecated)]
486 {
487 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488 is_commit_epoch = true;
489 tracing::trace!(
490 "max committed epoch bumped but no table committed epoch is changed"
491 );
492 }
493 }
494
495 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497 let mut is_applied_l0_compact = false;
498 for group_delta in &group_deltas.group_deltas {
499 match group_delta {
500 GroupDeltaCommon::GroupConstruct(group_construct) => {
501 let mut new_levels = build_initial_compaction_group_levels(
502 *compaction_group_id,
503 group_construct.get_group_config().unwrap(),
504 );
505 let parent_group_id = group_construct.parent_group_id;
506 new_levels.parent_group_id = parent_group_id;
507 #[expect(deprecated)]
508 new_levels
510 .member_table_ids
511 .clone_from(&group_construct.table_ids);
512 self.levels.insert(*compaction_group_id, new_levels);
513 let member_table_ids = if group_construct.version()
514 >= CompatibilityVersion::NoMemberTableIds
515 {
516 self.state_table_info
517 .compaction_group_member_table_ids(*compaction_group_id)
518 .iter()
519 .map(|table_id| table_id.table_id)
520 .collect()
521 } else {
522 #[expect(deprecated)]
523 BTreeSet::from_iter(group_construct.table_ids.clone())
525 };
526
527 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
528 let split_key = if group_construct.split_key.is_some() {
529 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
530 } else {
531 None
532 };
533 self.init_with_parent_group_v2(
534 parent_group_id,
535 *compaction_group_id,
536 group_construct.get_new_sst_start_id().into(),
537 split_key.clone(),
538 );
539 } else {
540 self.init_with_parent_group(
542 parent_group_id,
543 *compaction_group_id,
544 member_table_ids,
545 group_construct.get_new_sst_start_id().into(),
546 );
547 }
548 }
549 GroupDeltaCommon::GroupMerge(group_merge) => {
550 tracing::info!(
551 "group_merge left {:?} right {:?}",
552 group_merge.left_group_id,
553 group_merge.right_group_id
554 );
555 self.merge_compaction_group(
556 group_merge.left_group_id,
557 group_merge.right_group_id,
558 )
559 }
560 GroupDeltaCommon::IntraLevel(level_delta) => {
561 let levels =
562 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
563 panic!("compaction group {} does not exist", compaction_group_id)
564 });
565 if is_commit_epoch {
566 assert!(
567 level_delta.removed_table_ids.is_empty(),
568 "no sst should be deleted when committing an epoch"
569 );
570
571 let IntraLevelDelta {
572 level_idx,
573 l0_sub_level_id,
574 inserted_table_infos,
575 ..
576 } = level_delta;
577 {
578 assert_eq!(
579 *level_idx, 0,
580 "we should only add to L0 when we commit an epoch."
581 );
582 if !inserted_table_infos.is_empty() {
583 insert_new_sub_level(
584 &mut levels.l0,
585 *l0_sub_level_id,
586 PbLevelType::Overlapping,
587 inserted_table_infos.clone(),
588 None,
589 );
590 }
591 }
592 } else {
593 levels.apply_compact_ssts(
595 level_delta,
596 self.state_table_info
597 .compaction_group_member_table_ids(*compaction_group_id),
598 );
599 if level_delta.level_idx == 0 {
600 is_applied_l0_compact = true;
601 }
602 }
603 }
604 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
605 let levels =
606 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
607 panic!("compaction group {} does not exist", compaction_group_id)
608 });
609 assert!(is_commit_epoch);
610
611 if !inserted_table_infos.is_empty() {
612 let next_l0_sub_level_id = levels
613 .l0
614 .sub_levels
615 .last()
616 .map(|level| level.sub_level_id + 1)
617 .unwrap_or(1);
618
619 insert_new_sub_level(
620 &mut levels.l0,
621 next_l0_sub_level_id,
622 PbLevelType::Overlapping,
623 inserted_table_infos.clone(),
624 None,
625 );
626 }
627 }
628 GroupDeltaCommon::GroupDestroy(_) => {
629 self.levels.remove(compaction_group_id);
630 }
631 }
632 }
633 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
634 {
635 levels.post_apply_l0_compact();
636 }
637 }
638 self.id = version_delta.id;
639 #[expect(deprecated)]
640 {
641 self.max_committed_epoch = version_delta.max_committed_epoch;
642 }
643
644 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
648 HashMap::new();
649
650 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
652 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
653 if version_delta.removed_table_ids.contains(table_id) {
654 modified_table_watermarks.insert(*table_id, None);
655 } else {
656 let mut current_table_watermarks = (**current_table_watermarks).clone();
657 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
658 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
659 }
660 } else {
661 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
662 }
663 }
664 for (table_id, table_watermarks) in &self.table_watermarks {
665 let safe_epoch = if let Some(state_table_info) =
666 self.state_table_info.info().get(table_id)
667 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
668 && state_table_info.committed_epoch > *oldest_epoch
669 {
670 state_table_info.committed_epoch
672 } else {
673 continue;
675 };
676 let table_watermarks = modified_table_watermarks
677 .entry(*table_id)
678 .or_insert_with(|| Some((**table_watermarks).clone()));
679 if let Some(table_watermarks) = table_watermarks {
680 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
681 }
682 }
683 for (table_id, table_watermarks) in modified_table_watermarks {
685 if let Some(table_watermarks) = table_watermarks {
686 self.table_watermarks
687 .insert(table_id, Arc::new(table_watermarks));
688 } else {
689 self.table_watermarks.remove(&table_id);
690 }
691 }
692
693 Self::apply_change_log_delta(
695 &mut self.table_change_log,
696 &version_delta.change_log_delta,
697 &version_delta.removed_table_ids,
698 &version_delta.state_table_info_delta,
699 &changed_table_info,
700 );
701
702 apply_vector_index_delta(
704 &mut self.vector_indexes,
705 &version_delta.vector_index_delta,
706 &version_delta.removed_table_ids,
707 );
708 }
709
710 pub fn apply_change_log_delta<T: Clone>(
711 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
712 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
713 removed_table_ids: &HashSet<TableId>,
714 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
715 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
716 ) {
717 for (table_id, change_log_delta) in change_log_delta {
718 let new_change_log = &change_log_delta.new_log;
719 match table_change_log.entry(*table_id) {
720 Entry::Occupied(entry) => {
721 let change_log = entry.into_mut();
722 change_log.add_change_log(new_change_log.clone());
723 }
724 Entry::Vacant(entry) => {
725 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
726 }
727 };
728 }
729
730 table_change_log.retain(|table_id, _| {
734 if removed_table_ids.contains(table_id) {
735 return false;
736 }
737 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
738 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
739 } else {
741 return true;
743 }
744 let contains = change_log_delta.contains_key(table_id);
745 if !contains {
746 warn!(
747 ?table_id,
748 "table change log dropped due to no further change log at newly committed epoch",
749 );
750 }
751 contains
752 });
753
754 for (table_id, change_log_delta) in change_log_delta {
756 if let Some(change_log) = table_change_log.get_mut(table_id) {
757 change_log.truncate(change_log_delta.truncate_epoch);
758 }
759 }
760 }
761
762 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
763 let mut ret: BTreeMap<_, _> = BTreeMap::new();
764 for (compaction_group_id, group) in &self.levels {
765 let mut levels = vec![];
766 levels.extend(group.l0.sub_levels.iter());
767 levels.extend(group.levels.iter());
768 for level in levels {
769 for table_info in &level.table_infos {
770 if table_info.sst_id.inner() == table_info.object_id.inner() {
771 continue;
772 }
773 let object_id = table_info.object_id;
774 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
775 entry
776 .entry(*compaction_group_id)
777 .or_default()
778 .push(table_info.sst_id)
779 }
780 }
781 }
782 ret
783 }
784
785 pub fn merge_compaction_group(
786 &mut self,
787 left_group_id: CompactionGroupId,
788 right_group_id: CompactionGroupId,
789 ) {
790 let left_group_id_table_ids = self
792 .state_table_info
793 .compaction_group_member_table_ids(left_group_id)
794 .iter()
795 .map(|table_id| table_id.table_id);
796 let right_group_id_table_ids = self
797 .state_table_info
798 .compaction_group_member_table_ids(right_group_id)
799 .iter()
800 .map(|table_id| table_id.table_id);
801
802 assert!(
803 left_group_id_table_ids
804 .chain(right_group_id_table_ids)
805 .is_sorted()
806 );
807
808 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
809 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
810 panic!(
811 "compaction group should exist right {} all {:?}",
812 right_group_id, total_cg
813 )
814 });
815
816 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
817 panic!(
818 "compaction group should exist left {} all {:?}",
819 left_group_id, total_cg
820 )
821 });
822
823 group_split::merge_levels(left_levels, right_levels);
824 }
825
826 pub fn init_with_parent_group_v2(
827 &mut self,
828 parent_group_id: CompactionGroupId,
829 group_id: CompactionGroupId,
830 new_sst_start_id: HummockSstableId,
831 split_key: Option<Bytes>,
832 ) {
833 let mut new_sst_id = new_sst_start_id;
834 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
835 if new_sst_start_id != 0 {
836 if cfg!(debug_assertions) {
837 panic!(
838 "non-zero sst start id {} for NewCompactionGroup",
839 new_sst_start_id
840 );
841 } else {
842 warn!(
843 %new_sst_start_id,
844 "non-zero sst start id for NewCompactionGroup"
845 );
846 }
847 }
848 return;
849 } else if !self.levels.contains_key(&parent_group_id) {
850 unreachable!(
851 "non-existing parent group id {} to init from (V2)",
852 parent_group_id
853 );
854 }
855
856 let [parent_levels, cur_levels] = self
857 .levels
858 .get_disjoint_mut([&parent_group_id, &group_id])
859 .map(|res| res.unwrap());
860 parent_levels.compaction_group_version_id += 1;
863 cur_levels.compaction_group_version_id += 1;
864
865 let l0 = &mut parent_levels.l0;
866 {
867 for sub_level in &mut l0.sub_levels {
868 let target_l0 = &mut cur_levels.l0;
869 let insert_table_infos = if let Some(split_key) = &split_key {
872 group_split::split_sst_info_for_level_v2(
873 sub_level,
874 &mut new_sst_id,
875 split_key.clone(),
876 )
877 } else {
878 vec![]
879 };
880
881 if insert_table_infos.is_empty() {
882 continue;
883 }
884
885 sub_level
886 .table_infos
887 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
888 .for_each(|sst_info| {
889 sub_level.total_file_size -= sst_info.sst_size;
890 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
891 l0.total_file_size -= sst_info.sst_size;
892 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
893 });
894 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
895 Ok(idx) => {
896 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
897 }
898 Err(idx) => {
899 insert_new_sub_level(
900 target_l0,
901 sub_level.sub_level_id,
902 sub_level.level_type,
903 insert_table_infos,
904 Some(idx),
905 );
906 }
907 }
908 }
909
910 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
911 }
912
913 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
914 let insert_table_infos = if let Some(split_key) = &split_key {
915 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
916 } else {
917 vec![]
918 };
919
920 if insert_table_infos.is_empty() {
921 continue;
922 }
923
924 cur_levels.levels[idx].total_file_size += insert_table_infos
925 .iter()
926 .map(|sst| sst.sst_size)
927 .sum::<u64>();
928 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
929 .iter()
930 .map(|sst| sst.uncompressed_file_size)
931 .sum::<u64>();
932 cur_levels.levels[idx]
933 .table_infos
934 .extend(insert_table_infos);
935 cur_levels.levels[idx]
936 .table_infos
937 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
938 assert!(can_concat(&cur_levels.levels[idx].table_infos));
939 level
940 .table_infos
941 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
942 .for_each(|sst_info| {
943 level.total_file_size -= sst_info.sst_size;
944 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
945 });
946 }
947
948 assert!(
949 parent_levels
950 .l0
951 .sub_levels
952 .iter()
953 .all(|level| !level.table_infos.is_empty())
954 );
955 assert!(
956 cur_levels
957 .l0
958 .sub_levels
959 .iter()
960 .all(|level| !level.table_infos.is_empty())
961 );
962 }
963}
964
965impl<T> HummockVersionCommon<T>
966where
967 T: SstableIdReader + ObjectIdReader,
968{
969 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
970 self.get_sst_infos(exclude_change_log)
971 .map(|s| s.object_id())
972 .collect()
973 }
974
975 pub fn get_object_ids(
976 &self,
977 exclude_change_log: bool,
978 ) -> impl Iterator<Item = HummockObjectId> + '_ {
979 match HummockObjectId::Sstable(0.into()) {
983 HummockObjectId::Sstable(_) => {}
984 HummockObjectId::VectorFile(_) => {}
985 };
986 self.get_sst_infos(exclude_change_log)
987 .map(|s| HummockObjectId::Sstable(s.object_id()))
988 .chain(
989 self.vector_indexes
990 .values()
991 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
992 )
993 }
994
995 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
996 self.get_sst_infos(exclude_change_log)
997 .map(|s| s.sst_id())
998 .collect()
999 }
1000
1001 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1002 let may_table_change_log = if exclude_change_log {
1003 None
1004 } else {
1005 Some(self.table_change_log.values())
1006 };
1007 self.get_combined_levels()
1008 .flat_map(|level| level.table_infos.iter())
1009 .chain(
1010 may_table_change_log
1011 .map(|v| {
1012 v.flat_map(|table_change_log| {
1013 table_change_log.iter().flat_map(|epoch_change_log| {
1014 epoch_change_log
1015 .old_value
1016 .iter()
1017 .chain(epoch_change_log.new_value.iter())
1018 })
1019 })
1020 })
1021 .into_iter()
1022 .flatten(),
1023 )
1024 }
1025}
1026
1027impl Levels {
1028 pub(crate) fn apply_compact_ssts(
1029 &mut self,
1030 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1031 member_table_ids: &BTreeSet<TableId>,
1032 ) {
1033 let IntraLevelDeltaCommon {
1034 level_idx,
1035 l0_sub_level_id,
1036 inserted_table_infos: insert_table_infos,
1037 vnode_partition_count,
1038 removed_table_ids: delete_sst_ids_set,
1039 compaction_group_version_id,
1040 } = level_delta;
1041 let new_vnode_partition_count = *vnode_partition_count;
1042
1043 if is_compaction_task_expired(
1044 self.compaction_group_version_id,
1045 *compaction_group_version_id,
1046 ) {
1047 warn!(
1048 current_compaction_group_version_id = self.compaction_group_version_id,
1049 delta_compaction_group_version_id = compaction_group_version_id,
1050 level_idx,
1051 l0_sub_level_id,
1052 insert_table_infos = ?insert_table_infos
1053 .iter()
1054 .map(|sst| (sst.sst_id, sst.object_id))
1055 .collect_vec(),
1056 ?delete_sst_ids_set,
1057 "This VersionDelta may be committed by an expired compact task. Please check it."
1058 );
1059 return;
1060 }
1061 if !delete_sst_ids_set.is_empty() {
1062 if *level_idx == 0 {
1063 for level in &mut self.l0.sub_levels {
1064 level_delete_ssts(level, delete_sst_ids_set);
1065 }
1066 } else {
1067 let idx = *level_idx as usize - 1;
1068 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1069 }
1070 }
1071
1072 if !insert_table_infos.is_empty() {
1073 let insert_sst_level_id = *level_idx;
1074 let insert_sub_level_id = *l0_sub_level_id;
1075 if insert_sst_level_id == 0 {
1076 let l0 = &mut self.l0;
1077 let index = l0
1078 .sub_levels
1079 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1080 assert!(
1081 index < l0.sub_levels.len()
1082 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1083 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1084 insert_sub_level_id,
1085 delete_sst_ids_set,
1086 l0.sub_levels
1087 .iter()
1088 .map(|level| level.sub_level_id)
1089 .collect_vec()
1090 );
1091 if l0.sub_levels[index].table_infos.is_empty()
1092 && member_table_ids.len() == 1
1093 && insert_table_infos.iter().all(|sst| {
1094 sst.table_ids.len() == 1
1095 && sst.table_ids[0]
1096 == member_table_ids.iter().next().expect("non-empty").table_id
1097 })
1098 {
1099 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1102 }
1103 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1104 } else {
1105 let idx = insert_sst_level_id as usize - 1;
1106 if self.levels[idx].table_infos.is_empty()
1107 && insert_table_infos
1108 .iter()
1109 .all(|sst| sst.table_ids.len() == 1)
1110 {
1111 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1112 } else if self.levels[idx].vnode_partition_count != 0
1113 && new_vnode_partition_count == 0
1114 && member_table_ids.len() > 1
1115 {
1116 self.levels[idx].vnode_partition_count = 0;
1117 }
1118 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1119 }
1120 }
1121 }
1122
1123 pub(crate) fn post_apply_l0_compact(&mut self) {
1124 {
1125 self.l0
1126 .sub_levels
1127 .retain(|level| !level.table_infos.is_empty());
1128 self.l0.total_file_size = self
1129 .l0
1130 .sub_levels
1131 .iter()
1132 .map(|level| level.total_file_size)
1133 .sum::<u64>();
1134 self.l0.uncompressed_file_size = self
1135 .l0
1136 .sub_levels
1137 .iter()
1138 .map(|level| level.uncompressed_file_size)
1139 .sum::<u64>();
1140 }
1141 }
1142}
1143
1144impl<T, L> HummockVersionCommon<T, L> {
1145 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1146 self.levels
1147 .values()
1148 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1149 }
1150}
1151
1152pub fn build_initial_compaction_group_levels(
1153 group_id: CompactionGroupId,
1154 compaction_config: &CompactionConfig,
1155) -> Levels {
1156 let mut levels = vec![];
1157 for l in 0..compaction_config.get_max_level() {
1158 levels.push(Level {
1159 level_idx: (l + 1) as u32,
1160 level_type: PbLevelType::Nonoverlapping,
1161 table_infos: vec![],
1162 total_file_size: 0,
1163 sub_level_id: 0,
1164 uncompressed_file_size: 0,
1165 vnode_partition_count: 0,
1166 });
1167 }
1168 #[expect(deprecated)] Levels {
1170 levels,
1171 l0: OverlappingLevel {
1172 sub_levels: vec![],
1173 total_file_size: 0,
1174 uncompressed_file_size: 0,
1175 },
1176 group_id,
1177 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1178 member_table_ids: vec![],
1179 compaction_group_version_id: 0,
1180 }
1181}
1182
1183fn split_sst_info_for_level(
1184 member_table_ids: &BTreeSet<u32>,
1185 level: &mut Level,
1186 new_sst_id: &mut HummockSstableId,
1187) -> Vec<SstableInfo> {
1188 let mut insert_table_infos = vec![];
1191 for sst_info in &mut level.table_infos {
1192 let removed_table_ids = sst_info
1193 .table_ids
1194 .iter()
1195 .filter(|table_id| member_table_ids.contains(table_id))
1196 .cloned()
1197 .collect_vec();
1198 let sst_size = sst_info.sst_size;
1199 if sst_size / 2 == 0 {
1200 tracing::warn!(
1201 id = %sst_info.sst_id,
1202 object_id = %sst_info.object_id,
1203 sst_size = sst_info.sst_size,
1204 file_size = sst_info.file_size,
1205 "Sstable sst_size is under expected",
1206 );
1207 };
1208 if !removed_table_ids.is_empty() {
1209 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1210 sst_info,
1211 new_sst_id,
1212 sst_size / 2,
1213 sst_size / 2,
1214 member_table_ids.iter().cloned().collect_vec(),
1215 );
1216 *sst_info = modified_sst;
1217 insert_table_infos.push(branch_sst);
1218 }
1219 }
1220 insert_table_infos
1221}
1222
1223pub fn get_compaction_group_ids(
1225 version: &HummockVersion,
1226) -> impl Iterator<Item = CompactionGroupId> + '_ {
1227 version.levels.keys().cloned()
1228}
1229
1230pub fn get_table_compaction_group_id_mapping(
1231 version: &HummockVersion,
1232) -> HashMap<StateTableId, CompactionGroupId> {
1233 version
1234 .state_table_info
1235 .info()
1236 .iter()
1237 .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1238 .collect()
1239}
1240
1241pub fn get_compaction_group_ssts(
1243 version: &HummockVersion,
1244 group_id: CompactionGroupId,
1245) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1246 let group_levels = version.get_compaction_group_levels(group_id);
1247 group_levels
1248 .l0
1249 .sub_levels
1250 .iter()
1251 .rev()
1252 .chain(group_levels.levels.iter())
1253 .flat_map(|level| {
1254 level
1255 .table_infos
1256 .iter()
1257 .map(|table_info| (table_info.object_id, table_info.sst_id))
1258 })
1259}
1260
1261pub fn new_sub_level(
1262 sub_level_id: u64,
1263 level_type: PbLevelType,
1264 table_infos: Vec<SstableInfo>,
1265) -> Level {
1266 if level_type == PbLevelType::Nonoverlapping {
1267 debug_assert!(
1268 can_concat(&table_infos),
1269 "sst of non-overlapping level is not concat-able: {:?}",
1270 table_infos
1271 );
1272 }
1273 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1274 let uncompressed_file_size = table_infos
1275 .iter()
1276 .map(|table| table.uncompressed_file_size)
1277 .sum();
1278 Level {
1279 level_idx: 0,
1280 level_type,
1281 table_infos,
1282 total_file_size,
1283 sub_level_id,
1284 uncompressed_file_size,
1285 vnode_partition_count: 0,
1286 }
1287}
1288
1289pub fn add_ssts_to_sub_level(
1290 l0: &mut OverlappingLevel,
1291 sub_level_idx: usize,
1292 insert_table_infos: Vec<SstableInfo>,
1293) {
1294 insert_table_infos.iter().for_each(|sst| {
1295 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1296 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1297 l0.total_file_size += sst.sst_size;
1298 l0.uncompressed_file_size += sst.uncompressed_file_size;
1299 });
1300 l0.sub_levels[sub_level_idx]
1301 .table_infos
1302 .extend(insert_table_infos);
1303 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1304 l0.sub_levels[sub_level_idx]
1305 .table_infos
1306 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1307 assert!(
1308 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1309 "sstable ids: {:?}",
1310 l0.sub_levels[sub_level_idx]
1311 .table_infos
1312 .iter()
1313 .map(|sst| sst.sst_id)
1314 .collect_vec()
1315 );
1316 }
1317}
1318
1319pub fn insert_new_sub_level(
1321 l0: &mut OverlappingLevel,
1322 insert_sub_level_id: u64,
1323 level_type: PbLevelType,
1324 insert_table_infos: Vec<SstableInfo>,
1325 sub_level_insert_hint: Option<usize>,
1326) {
1327 if insert_sub_level_id == u64::MAX {
1328 return;
1329 }
1330 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1331 insert_pos
1332 } else {
1333 if let Some(newest_level) = l0.sub_levels.last() {
1334 assert!(
1335 newest_level.sub_level_id < insert_sub_level_id,
1336 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1337 newest_level.sub_level_id,
1338 insert_sub_level_id,
1339 l0,
1340 );
1341 }
1342 l0.sub_levels.len()
1343 };
1344 #[cfg(debug_assertions)]
1345 {
1346 if insert_pos > 0 {
1347 if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1348 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1349 }
1350 }
1351 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1352 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1353 }
1354 }
1355 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1358 l0.total_file_size += level.total_file_size;
1359 l0.uncompressed_file_size += level.uncompressed_file_size;
1360 l0.sub_levels.insert(insert_pos, level);
1361}
1362
1363fn level_delete_ssts(
1367 operand: &mut Level,
1368 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1369) -> bool {
1370 let original_len = operand.table_infos.len();
1371 operand
1372 .table_infos
1373 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1374 operand.total_file_size = operand
1375 .table_infos
1376 .iter()
1377 .map(|table| table.sst_size)
1378 .sum::<u64>();
1379 operand.uncompressed_file_size = operand
1380 .table_infos
1381 .iter()
1382 .map(|table| table.uncompressed_file_size)
1383 .sum::<u64>();
1384 original_len != operand.table_infos.len()
1385}
1386
1387fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1388 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1389 format!(
1390 "sstable ids: {:?}",
1391 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1392 )
1393 }
1394 operand.total_file_size += insert_table_infos
1395 .iter()
1396 .map(|sst| sst.sst_size)
1397 .sum::<u64>();
1398 operand.uncompressed_file_size += insert_table_infos
1399 .iter()
1400 .map(|sst| sst.uncompressed_file_size)
1401 .sum::<u64>();
1402 if operand.level_type == PbLevelType::Overlapping {
1403 operand.level_type = PbLevelType::Nonoverlapping;
1404 operand
1405 .table_infos
1406 .extend(insert_table_infos.iter().cloned());
1407 operand
1408 .table_infos
1409 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1410 assert!(
1411 can_concat(&operand.table_infos),
1412 "{}",
1413 display_sstable_infos(&operand.table_infos)
1414 );
1415 } else if !insert_table_infos.is_empty() {
1416 let sorted_insert: Vec<_> = insert_table_infos
1417 .iter()
1418 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1419 .cloned()
1420 .collect();
1421 let first = &sorted_insert[0];
1422 let last = &sorted_insert[sorted_insert.len() - 1];
1423 let pos = operand
1424 .table_infos
1425 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1426 if pos >= operand.table_infos.len()
1427 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1428 {
1429 operand.table_infos.splice(pos..pos, sorted_insert);
1430 let validate_range = operand
1432 .table_infos
1433 .iter()
1434 .skip(pos.saturating_sub(1))
1435 .take(insert_table_infos.len() + 2)
1436 .collect_vec();
1437 assert!(
1438 can_concat(&validate_range),
1439 "{}",
1440 display_sstable_infos(&validate_range),
1441 );
1442 } else {
1443 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1446 for i in insert_table_infos {
1447 let pos = operand
1448 .table_infos
1449 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1450 operand.table_infos.insert(pos, i.clone());
1451 }
1452 assert!(
1453 can_concat(&operand.table_infos),
1454 "{}",
1455 display_sstable_infos(&operand.table_infos)
1456 );
1457 }
1458 }
1459}
1460
1461pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1462 match HummockObjectId::Sstable(0.into()) {
1466 HummockObjectId::Sstable(_) => {}
1467 HummockObjectId::VectorFile(_) => {}
1468 };
1469 version
1470 .levels
1471 .values()
1472 .flat_map(|cg| {
1473 cg.level0()
1474 .sub_levels
1475 .iter()
1476 .chain(cg.levels.iter())
1477 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1478 })
1479 .chain(version.table_change_log.values().flat_map(|c| {
1480 c.iter().flat_map(|l| {
1481 l.old_value
1482 .iter()
1483 .chain(l.new_value.iter())
1484 .map(|t| (t.object_id, t.file_size))
1485 })
1486 }))
1487 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1488 .chain(
1489 version
1490 .vector_indexes
1491 .values()
1492 .flat_map(|index| index.get_objects()),
1493 )
1494 .collect()
1495}
1496
1497pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1500 let mut res = Vec::new();
1501 for (group_id, levels) in &version.levels {
1503 if levels.group_id != *group_id {
1505 res.push(format!(
1506 "GROUP {}: inconsistent group id {} in Levels",
1507 group_id, levels.group_id
1508 ));
1509 }
1510
1511 let validate_level = |group: CompactionGroupId,
1512 expected_level_idx: u32,
1513 level: &Level,
1514 res: &mut Vec<String>| {
1515 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1516 if level.level_idx == 0 {
1517 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1518 if level.table_infos.is_empty() {
1520 res.push(format!("{}: empty level", level_identifier));
1521 }
1522 } else if level.level_type != PbLevelType::Nonoverlapping {
1523 res.push(format!(
1525 "{}: level type {:?} is not non-overlapping",
1526 level_identifier, level.level_type
1527 ));
1528 }
1529
1530 if level.level_idx != expected_level_idx {
1532 res.push(format!(
1533 "{}: mismatched level idx {}",
1534 level_identifier, expected_level_idx
1535 ));
1536 }
1537
1538 let mut prev_table_info: Option<&SstableInfo> = None;
1539 for table_info in &level.table_infos {
1540 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1542 res.push(format!(
1543 "{} SST {}: table_ids not sorted",
1544 level_identifier, table_info.object_id
1545 ));
1546 }
1547
1548 if level.level_type == PbLevelType::Nonoverlapping {
1550 if let Some(prev) = prev_table_info.take() {
1551 if prev
1552 .key_range
1553 .compare_right_with(&table_info.key_range.left)
1554 != Ordering::Less
1555 {
1556 res.push(format!(
1557 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1558 level_identifier, table_info.object_id, prev, table_info
1559 ));
1560 }
1561 }
1562 let _ = prev_table_info.insert(table_info);
1563 }
1564 }
1565 };
1566
1567 let l0 = &levels.l0;
1568 let mut prev_sub_level_id = u64::MAX;
1569 for sub_level in &l0.sub_levels {
1570 if sub_level.sub_level_id >= prev_sub_level_id {
1572 res.push(format!(
1573 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1574 group_id, sub_level.level_idx, prev_sub_level_id
1575 ));
1576 }
1577 prev_sub_level_id = sub_level.sub_level_id;
1578
1579 validate_level(*group_id, 0, sub_level, &mut res);
1580 }
1581
1582 for idx in 1..=levels.levels.len() {
1583 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1584 }
1585 }
1586 res
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use std::collections::{HashMap, HashSet};
1592
1593 use bytes::Bytes;
1594 use risingwave_common::catalog::TableId;
1595 use risingwave_common::hash::VirtualNode;
1596 use risingwave_common::util::epoch::test_epoch;
1597 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1598
1599 use super::group_split;
1600 use crate::HummockVersionId;
1601 use crate::compaction_group::group_split::*;
1602 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1603 use crate::key::{FullKey, gen_key_from_str};
1604 use crate::key_range::KeyRange;
1605 use crate::level::{Level, Levels, OverlappingLevel};
1606 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1607 use crate::version::{
1608 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1609 };
1610
1611 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1612 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1613 }
1614
1615 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1616 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1617 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1618 let full_key_l = FullKey::for_test(
1619 TableId::new(*table_ids.first().unwrap()),
1620 table_key_l,
1621 epoch,
1622 )
1623 .encode();
1624 let full_key_r =
1625 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1626 .encode();
1627
1628 SstableInfoInner {
1629 sst_id: sst_id.into(),
1630 key_range: KeyRange {
1631 left: full_key_l.into(),
1632 right: full_key_r.into(),
1633 right_exclusive: false,
1634 },
1635 table_ids,
1636 object_id: sst_id.into(),
1637 min_epoch: 20,
1638 max_epoch: 20,
1639 file_size: 100,
1640 sst_size: 100,
1641 ..Default::default()
1642 }
1643 }
1644
1645 #[test]
1646 fn test_get_sst_object_ids() {
1647 let mut version = HummockVersion {
1648 id: HummockVersionId::new(0),
1649 levels: HashMap::from_iter([(
1650 0,
1651 Levels {
1652 levels: vec![],
1653 l0: OverlappingLevel {
1654 sub_levels: vec![],
1655 total_file_size: 0,
1656 uncompressed_file_size: 0,
1657 },
1658 ..Default::default()
1659 },
1660 )]),
1661 ..Default::default()
1662 };
1663 assert_eq!(version.get_object_ids(false).count(), 0);
1664
1665 version
1667 .levels
1668 .get_mut(&0)
1669 .unwrap()
1670 .l0
1671 .sub_levels
1672 .push(Level {
1673 table_infos: vec![
1674 SstableInfoInner {
1675 object_id: 11.into(),
1676 sst_id: 11.into(),
1677 ..Default::default()
1678 }
1679 .into(),
1680 ],
1681 ..Default::default()
1682 });
1683 assert_eq!(version.get_object_ids(false).count(), 1);
1684
1685 version.levels.get_mut(&0).unwrap().levels.push(Level {
1687 table_infos: vec![
1688 SstableInfoInner {
1689 object_id: 22.into(),
1690 sst_id: 22.into(),
1691 ..Default::default()
1692 }
1693 .into(),
1694 ],
1695 ..Default::default()
1696 });
1697 assert_eq!(version.get_object_ids(false).count(), 2);
1698 }
1699
1700 #[test]
1701 fn test_apply_version_delta() {
1702 let mut version = HummockVersion {
1703 id: HummockVersionId::new(0),
1704 levels: HashMap::from_iter([
1705 (
1706 0,
1707 build_initial_compaction_group_levels(
1708 0,
1709 &CompactionConfig {
1710 max_level: 6,
1711 ..Default::default()
1712 },
1713 ),
1714 ),
1715 (
1716 1,
1717 build_initial_compaction_group_levels(
1718 1,
1719 &CompactionConfig {
1720 max_level: 6,
1721 ..Default::default()
1722 },
1723 ),
1724 ),
1725 ]),
1726 ..Default::default()
1727 };
1728 let version_delta = HummockVersionDelta {
1729 id: HummockVersionId::new(1),
1730 group_deltas: HashMap::from_iter([
1731 (
1732 2,
1733 GroupDeltas {
1734 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1735 group_config: Some(CompactionConfig {
1736 max_level: 6,
1737 ..Default::default()
1738 }),
1739 ..Default::default()
1740 }))],
1741 },
1742 ),
1743 (
1744 0,
1745 GroupDeltas {
1746 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1747 },
1748 ),
1749 (
1750 1,
1751 GroupDeltas {
1752 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1753 1,
1754 0,
1755 HashSet::new(),
1756 vec![
1757 SstableInfoInner {
1758 object_id: 1.into(),
1759 sst_id: 1.into(),
1760 ..Default::default()
1761 }
1762 .into(),
1763 ],
1764 0,
1765 version
1766 .levels
1767 .get(&1)
1768 .as_ref()
1769 .unwrap()
1770 .compaction_group_version_id,
1771 ))],
1772 },
1773 ),
1774 ]),
1775 ..Default::default()
1776 };
1777 let version_delta = version_delta;
1778
1779 version.apply_version_delta(&version_delta);
1780 let mut cg1 = build_initial_compaction_group_levels(
1781 1,
1782 &CompactionConfig {
1783 max_level: 6,
1784 ..Default::default()
1785 },
1786 );
1787 cg1.levels[0] = Level {
1788 level_idx: 1,
1789 level_type: LevelType::Nonoverlapping,
1790 table_infos: vec![
1791 SstableInfoInner {
1792 object_id: 1.into(),
1793 sst_id: 1.into(),
1794 ..Default::default()
1795 }
1796 .into(),
1797 ],
1798 ..Default::default()
1799 };
1800 assert_eq!(
1801 version,
1802 HummockVersion {
1803 id: HummockVersionId::new(1),
1804 levels: HashMap::from_iter([
1805 (
1806 2,
1807 build_initial_compaction_group_levels(
1808 2,
1809 &CompactionConfig {
1810 max_level: 6,
1811 ..Default::default()
1812 },
1813 ),
1814 ),
1815 (1, cg1),
1816 ]),
1817 ..Default::default()
1818 }
1819 );
1820 }
1821
1822 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1823 gen_sst_info_impl(object_id, table_ids, left, right).into()
1824 }
1825
1826 fn gen_sst_info_impl(
1827 object_id: u64,
1828 table_ids: Vec<u32>,
1829 left: Bytes,
1830 right: Bytes,
1831 ) -> SstableInfoInner {
1832 SstableInfoInner {
1833 object_id: object_id.into(),
1834 sst_id: object_id.into(),
1835 key_range: KeyRange {
1836 left,
1837 right,
1838 right_exclusive: false,
1839 },
1840 table_ids,
1841 file_size: 100,
1842 sst_size: 100,
1843 uncompressed_file_size: 100,
1844 ..Default::default()
1845 }
1846 }
1847
1848 #[test]
1849 fn test_merge_levels() {
1850 let mut left_levels = build_initial_compaction_group_levels(
1851 1,
1852 &CompactionConfig {
1853 max_level: 6,
1854 ..Default::default()
1855 },
1856 );
1857
1858 let mut right_levels = build_initial_compaction_group_levels(
1859 2,
1860 &CompactionConfig {
1861 max_level: 6,
1862 ..Default::default()
1863 },
1864 );
1865
1866 left_levels.levels[0] = Level {
1867 level_idx: 1,
1868 level_type: LevelType::Nonoverlapping,
1869 table_infos: vec![
1870 gen_sst_info(
1871 1,
1872 vec![3],
1873 FullKey::for_test(
1874 TableId::new(3),
1875 gen_key_from_str(VirtualNode::from_index(1), "1"),
1876 0,
1877 )
1878 .encode()
1879 .into(),
1880 FullKey::for_test(
1881 TableId::new(3),
1882 gen_key_from_str(VirtualNode::from_index(200), "1"),
1883 0,
1884 )
1885 .encode()
1886 .into(),
1887 ),
1888 gen_sst_info(
1889 10,
1890 vec![3, 4],
1891 FullKey::for_test(
1892 TableId::new(3),
1893 gen_key_from_str(VirtualNode::from_index(201), "1"),
1894 0,
1895 )
1896 .encode()
1897 .into(),
1898 FullKey::for_test(
1899 TableId::new(4),
1900 gen_key_from_str(VirtualNode::from_index(10), "1"),
1901 0,
1902 )
1903 .encode()
1904 .into(),
1905 ),
1906 gen_sst_info(
1907 11,
1908 vec![4],
1909 FullKey::for_test(
1910 TableId::new(4),
1911 gen_key_from_str(VirtualNode::from_index(11), "1"),
1912 0,
1913 )
1914 .encode()
1915 .into(),
1916 FullKey::for_test(
1917 TableId::new(4),
1918 gen_key_from_str(VirtualNode::from_index(200), "1"),
1919 0,
1920 )
1921 .encode()
1922 .into(),
1923 ),
1924 ],
1925 total_file_size: 300,
1926 ..Default::default()
1927 };
1928
1929 left_levels.l0.sub_levels.push(Level {
1930 level_idx: 0,
1931 table_infos: vec![gen_sst_info(
1932 3,
1933 vec![3],
1934 FullKey::for_test(
1935 TableId::new(3),
1936 gen_key_from_str(VirtualNode::from_index(1), "1"),
1937 0,
1938 )
1939 .encode()
1940 .into(),
1941 FullKey::for_test(
1942 TableId::new(3),
1943 gen_key_from_str(VirtualNode::from_index(200), "1"),
1944 0,
1945 )
1946 .encode()
1947 .into(),
1948 )],
1949 sub_level_id: 101,
1950 level_type: LevelType::Overlapping,
1951 total_file_size: 100,
1952 ..Default::default()
1953 });
1954
1955 left_levels.l0.sub_levels.push(Level {
1956 level_idx: 0,
1957 table_infos: vec![gen_sst_info(
1958 3,
1959 vec![3],
1960 FullKey::for_test(
1961 TableId::new(3),
1962 gen_key_from_str(VirtualNode::from_index(1), "1"),
1963 0,
1964 )
1965 .encode()
1966 .into(),
1967 FullKey::for_test(
1968 TableId::new(3),
1969 gen_key_from_str(VirtualNode::from_index(200), "1"),
1970 0,
1971 )
1972 .encode()
1973 .into(),
1974 )],
1975 sub_level_id: 103,
1976 level_type: LevelType::Overlapping,
1977 total_file_size: 100,
1978 ..Default::default()
1979 });
1980
1981 left_levels.l0.sub_levels.push(Level {
1982 level_idx: 0,
1983 table_infos: vec![gen_sst_info(
1984 3,
1985 vec![3],
1986 FullKey::for_test(
1987 TableId::new(3),
1988 gen_key_from_str(VirtualNode::from_index(1), "1"),
1989 0,
1990 )
1991 .encode()
1992 .into(),
1993 FullKey::for_test(
1994 TableId::new(3),
1995 gen_key_from_str(VirtualNode::from_index(200), "1"),
1996 0,
1997 )
1998 .encode()
1999 .into(),
2000 )],
2001 sub_level_id: 105,
2002 level_type: LevelType::Nonoverlapping,
2003 total_file_size: 100,
2004 ..Default::default()
2005 });
2006
2007 right_levels.levels[0] = Level {
2008 level_idx: 1,
2009 level_type: LevelType::Nonoverlapping,
2010 table_infos: vec![
2011 gen_sst_info(
2012 1,
2013 vec![5],
2014 FullKey::for_test(
2015 TableId::new(5),
2016 gen_key_from_str(VirtualNode::from_index(1), "1"),
2017 0,
2018 )
2019 .encode()
2020 .into(),
2021 FullKey::for_test(
2022 TableId::new(5),
2023 gen_key_from_str(VirtualNode::from_index(200), "1"),
2024 0,
2025 )
2026 .encode()
2027 .into(),
2028 ),
2029 gen_sst_info(
2030 10,
2031 vec![5, 6],
2032 FullKey::for_test(
2033 TableId::new(5),
2034 gen_key_from_str(VirtualNode::from_index(201), "1"),
2035 0,
2036 )
2037 .encode()
2038 .into(),
2039 FullKey::for_test(
2040 TableId::new(6),
2041 gen_key_from_str(VirtualNode::from_index(10), "1"),
2042 0,
2043 )
2044 .encode()
2045 .into(),
2046 ),
2047 gen_sst_info(
2048 11,
2049 vec![6],
2050 FullKey::for_test(
2051 TableId::new(6),
2052 gen_key_from_str(VirtualNode::from_index(11), "1"),
2053 0,
2054 )
2055 .encode()
2056 .into(),
2057 FullKey::for_test(
2058 TableId::new(6),
2059 gen_key_from_str(VirtualNode::from_index(200), "1"),
2060 0,
2061 )
2062 .encode()
2063 .into(),
2064 ),
2065 ],
2066 total_file_size: 300,
2067 ..Default::default()
2068 };
2069
2070 right_levels.l0.sub_levels.push(Level {
2071 level_idx: 0,
2072 table_infos: vec![gen_sst_info(
2073 3,
2074 vec![5],
2075 FullKey::for_test(
2076 TableId::new(5),
2077 gen_key_from_str(VirtualNode::from_index(1), "1"),
2078 0,
2079 )
2080 .encode()
2081 .into(),
2082 FullKey::for_test(
2083 TableId::new(5),
2084 gen_key_from_str(VirtualNode::from_index(200), "1"),
2085 0,
2086 )
2087 .encode()
2088 .into(),
2089 )],
2090 sub_level_id: 101,
2091 level_type: LevelType::Overlapping,
2092 total_file_size: 100,
2093 ..Default::default()
2094 });
2095
2096 right_levels.l0.sub_levels.push(Level {
2097 level_idx: 0,
2098 table_infos: vec![gen_sst_info(
2099 5,
2100 vec![5],
2101 FullKey::for_test(
2102 TableId::new(5),
2103 gen_key_from_str(VirtualNode::from_index(1), "1"),
2104 0,
2105 )
2106 .encode()
2107 .into(),
2108 FullKey::for_test(
2109 TableId::new(5),
2110 gen_key_from_str(VirtualNode::from_index(200), "1"),
2111 0,
2112 )
2113 .encode()
2114 .into(),
2115 )],
2116 sub_level_id: 102,
2117 level_type: LevelType::Overlapping,
2118 total_file_size: 100,
2119 ..Default::default()
2120 });
2121
2122 right_levels.l0.sub_levels.push(Level {
2123 level_idx: 0,
2124 table_infos: vec![gen_sst_info(
2125 3,
2126 vec![5],
2127 FullKey::for_test(
2128 TableId::new(5),
2129 gen_key_from_str(VirtualNode::from_index(1), "1"),
2130 0,
2131 )
2132 .encode()
2133 .into(),
2134 FullKey::for_test(
2135 TableId::new(5),
2136 gen_key_from_str(VirtualNode::from_index(200), "1"),
2137 0,
2138 )
2139 .encode()
2140 .into(),
2141 )],
2142 sub_level_id: 103,
2143 level_type: LevelType::Nonoverlapping,
2144 total_file_size: 100,
2145 ..Default::default()
2146 });
2147
2148 {
2149 let mut left_levels = Levels::default();
2151 let right_levels = Levels::default();
2152
2153 group_split::merge_levels(&mut left_levels, right_levels);
2154 }
2155
2156 {
2157 let mut left_levels = build_initial_compaction_group_levels(
2159 1,
2160 &CompactionConfig {
2161 max_level: 6,
2162 ..Default::default()
2163 },
2164 );
2165 let right_levels = right_levels.clone();
2166
2167 group_split::merge_levels(&mut left_levels, right_levels);
2168
2169 assert!(left_levels.l0.sub_levels.len() == 3);
2170 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2171 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2172 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2173 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2174 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2175 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2176
2177 assert!(left_levels.levels[0].level_idx == 1);
2178 assert_eq!(300, left_levels.levels[0].total_file_size);
2179 }
2180
2181 {
2182 let mut left_levels = left_levels.clone();
2184 let right_levels = build_initial_compaction_group_levels(
2185 2,
2186 &CompactionConfig {
2187 max_level: 6,
2188 ..Default::default()
2189 },
2190 );
2191
2192 group_split::merge_levels(&mut left_levels, right_levels);
2193
2194 assert!(left_levels.l0.sub_levels.len() == 3);
2195 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2196 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2197 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2198 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2199 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2200 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2201
2202 assert!(left_levels.levels[0].level_idx == 1);
2203 assert_eq!(300, left_levels.levels[0].total_file_size);
2204 }
2205
2206 {
2207 let mut left_levels = left_levels.clone();
2208 let right_levels = right_levels.clone();
2209
2210 group_split::merge_levels(&mut left_levels, right_levels);
2211
2212 assert!(left_levels.l0.sub_levels.len() == 6);
2213 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2214 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2215 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2216 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2217 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2218 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2219 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2220 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2221 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2222 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2223 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2224 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2225
2226 assert!(left_levels.levels[0].level_idx == 1);
2227 assert_eq!(600, left_levels.levels[0].total_file_size);
2228 }
2229 }
2230
2231 #[test]
2232 fn test_get_split_pos() {
2233 let epoch = test_epoch(1);
2234 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2235 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2236 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2237
2238 let ssts = vec![s1, s2, s3];
2239 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2240
2241 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2242 assert_eq!(1, pos);
2243
2244 let pos = group_split::get_split_pos(&vec![], split_key);
2245 assert_eq!(0, pos);
2246 }
2247
2248 #[test]
2249 fn test_split_sst() {
2250 let epoch = test_epoch(1);
2251 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2252
2253 {
2254 let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2255 let origin_sst = sst.clone();
2256 let sst_size = origin_sst.sst_size;
2257 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2258 assert_eq!(SstSplitType::Both, split_type);
2259
2260 let mut new_sst_id = 10.into();
2261 let (origin_sst, branched_sst) = group_split::split_sst(
2262 origin_sst,
2263 &mut new_sst_id,
2264 split_key,
2265 sst_size / 2,
2266 sst_size / 2,
2267 );
2268
2269 let origin_sst = origin_sst.unwrap();
2270 let branched_sst = branched_sst.unwrap();
2271
2272 assert!(origin_sst.key_range.right_exclusive);
2273 assert!(
2274 origin_sst
2275 .key_range
2276 .right
2277 .cmp(&branched_sst.key_range.left)
2278 .is_le()
2279 );
2280 assert!(origin_sst.table_ids.is_sorted());
2281 assert!(branched_sst.table_ids.is_sorted());
2282 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2283 assert!(branched_sst.sst_size < origin_sst.file_size);
2284 assert_eq!(10, branched_sst.sst_id);
2285 assert_eq!(11, origin_sst.sst_id);
2286 assert_eq!(&3, branched_sst.table_ids.first().unwrap()); }
2288
2289 {
2290 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2292 let origin_sst = sst.clone();
2293 let sst_size = origin_sst.sst_size;
2294 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2295 assert_eq!(SstSplitType::Both, split_type);
2296
2297 let mut new_sst_id = 10.into();
2298 let (origin_sst, branched_sst) = group_split::split_sst(
2299 origin_sst,
2300 &mut new_sst_id,
2301 split_key,
2302 sst_size / 2,
2303 sst_size / 2,
2304 );
2305
2306 let origin_sst = origin_sst.unwrap();
2307 let branched_sst = branched_sst.unwrap();
2308
2309 assert!(origin_sst.key_range.right_exclusive);
2310 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2311 assert!(origin_sst.table_ids.is_sorted());
2312 assert!(branched_sst.table_ids.is_sorted());
2313 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2314 assert!(branched_sst.sst_size < origin_sst.file_size);
2315 assert_eq!(10, branched_sst.sst_id);
2316 assert_eq!(11, origin_sst.sst_id);
2317 assert_eq!(&5, branched_sst.table_ids.first().unwrap()); }
2319
2320 {
2321 let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2322 let origin_sst = sst.clone();
2323 let split_type = group_split::need_to_split(&origin_sst, split_key);
2324 assert_eq!(SstSplitType::Left, split_type);
2325 }
2326
2327 {
2328 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2329 let origin_sst = sst.clone();
2330 let split_type = group_split::need_to_split(&origin_sst, split_key);
2331 assert_eq!(SstSplitType::Both, split_type);
2332
2333 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2334 let origin_sst = sst.clone();
2335 let split_type = group_split::need_to_split(&origin_sst, split_key);
2336 assert_eq!(SstSplitType::Right, split_type);
2337 }
2338
2339 {
2340 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2342 sst.key_range.right = sst.key_range.left.clone();
2343 let sst: SstableInfo = sst.into();
2344 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2345 let origin_sst = sst.clone();
2346 let sst_size = origin_sst.sst_size;
2347
2348 let mut new_sst_id = 10.into();
2349 let (origin_sst, branched_sst) = group_split::split_sst(
2350 origin_sst,
2351 &mut new_sst_id,
2352 split_key,
2353 sst_size / 2,
2354 sst_size / 2,
2355 );
2356
2357 assert!(origin_sst.is_none());
2358 assert!(branched_sst.is_some());
2359 }
2360 }
2361
2362 #[test]
2363 fn test_split_sst_info_for_level() {
2364 let mut version = HummockVersion {
2365 id: HummockVersionId(0),
2366 levels: HashMap::from_iter([(
2367 1,
2368 build_initial_compaction_group_levels(
2369 1,
2370 &CompactionConfig {
2371 max_level: 6,
2372 ..Default::default()
2373 },
2374 ),
2375 )]),
2376 ..Default::default()
2377 };
2378
2379 let cg1 = version.levels.get_mut(&1).unwrap();
2380
2381 cg1.levels[0] = Level {
2382 level_idx: 1,
2383 level_type: LevelType::Nonoverlapping,
2384 table_infos: vec![
2385 gen_sst_info(
2386 1,
2387 vec![3],
2388 FullKey::for_test(
2389 TableId::new(3),
2390 gen_key_from_str(VirtualNode::from_index(1), "1"),
2391 0,
2392 )
2393 .encode()
2394 .into(),
2395 FullKey::for_test(
2396 TableId::new(3),
2397 gen_key_from_str(VirtualNode::from_index(200), "1"),
2398 0,
2399 )
2400 .encode()
2401 .into(),
2402 ),
2403 gen_sst_info(
2404 10,
2405 vec![3, 4],
2406 FullKey::for_test(
2407 TableId::new(3),
2408 gen_key_from_str(VirtualNode::from_index(201), "1"),
2409 0,
2410 )
2411 .encode()
2412 .into(),
2413 FullKey::for_test(
2414 TableId::new(4),
2415 gen_key_from_str(VirtualNode::from_index(10), "1"),
2416 0,
2417 )
2418 .encode()
2419 .into(),
2420 ),
2421 gen_sst_info(
2422 11,
2423 vec![4],
2424 FullKey::for_test(
2425 TableId::new(4),
2426 gen_key_from_str(VirtualNode::from_index(11), "1"),
2427 0,
2428 )
2429 .encode()
2430 .into(),
2431 FullKey::for_test(
2432 TableId::new(4),
2433 gen_key_from_str(VirtualNode::from_index(200), "1"),
2434 0,
2435 )
2436 .encode()
2437 .into(),
2438 ),
2439 ],
2440 total_file_size: 300,
2441 ..Default::default()
2442 };
2443
2444 cg1.l0.sub_levels.push(Level {
2445 level_idx: 0,
2446 table_infos: vec![
2447 gen_sst_info(
2448 2,
2449 vec![2],
2450 FullKey::for_test(
2451 TableId::new(0),
2452 gen_key_from_str(VirtualNode::from_index(1), "1"),
2453 0,
2454 )
2455 .encode()
2456 .into(),
2457 FullKey::for_test(
2458 TableId::new(2),
2459 gen_key_from_str(VirtualNode::from_index(200), "1"),
2460 0,
2461 )
2462 .encode()
2463 .into(),
2464 ),
2465 gen_sst_info(
2466 22,
2467 vec![2],
2468 FullKey::for_test(
2469 TableId::new(0),
2470 gen_key_from_str(VirtualNode::from_index(1), "1"),
2471 0,
2472 )
2473 .encode()
2474 .into(),
2475 FullKey::for_test(
2476 TableId::new(2),
2477 gen_key_from_str(VirtualNode::from_index(200), "1"),
2478 0,
2479 )
2480 .encode()
2481 .into(),
2482 ),
2483 gen_sst_info(
2484 23,
2485 vec![2],
2486 FullKey::for_test(
2487 TableId::new(0),
2488 gen_key_from_str(VirtualNode::from_index(1), "1"),
2489 0,
2490 )
2491 .encode()
2492 .into(),
2493 FullKey::for_test(
2494 TableId::new(2),
2495 gen_key_from_str(VirtualNode::from_index(200), "1"),
2496 0,
2497 )
2498 .encode()
2499 .into(),
2500 ),
2501 gen_sst_info(
2502 24,
2503 vec![2],
2504 FullKey::for_test(
2505 TableId::new(2),
2506 gen_key_from_str(VirtualNode::from_index(1), "1"),
2507 0,
2508 )
2509 .encode()
2510 .into(),
2511 FullKey::for_test(
2512 TableId::new(2),
2513 gen_key_from_str(VirtualNode::from_index(200), "1"),
2514 0,
2515 )
2516 .encode()
2517 .into(),
2518 ),
2519 gen_sst_info(
2520 25,
2521 vec![2],
2522 FullKey::for_test(
2523 TableId::new(0),
2524 gen_key_from_str(VirtualNode::from_index(1), "1"),
2525 0,
2526 )
2527 .encode()
2528 .into(),
2529 FullKey::for_test(
2530 TableId::new(0),
2531 gen_key_from_str(VirtualNode::from_index(200), "1"),
2532 0,
2533 )
2534 .encode()
2535 .into(),
2536 ),
2537 ],
2538 sub_level_id: 101,
2539 level_type: LevelType::Overlapping,
2540 total_file_size: 300,
2541 ..Default::default()
2542 });
2543
2544 {
2545 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2547
2548 let mut new_sst_id = 100.into();
2549 let x = group_split::split_sst_info_for_level_v2(
2550 &mut cg1.l0.sub_levels[0],
2551 &mut new_sst_id,
2552 split_key,
2553 );
2554 let mut right_l0 = OverlappingLevel {
2563 sub_levels: vec![],
2564 total_file_size: 0,
2565 uncompressed_file_size: 0,
2566 };
2567
2568 right_l0.sub_levels.push(Level {
2569 level_idx: 0,
2570 table_infos: x,
2571 sub_level_id: 101,
2572 total_file_size: 100,
2573 level_type: LevelType::Overlapping,
2574 ..Default::default()
2575 });
2576
2577 let right_levels = Levels {
2578 levels: vec![],
2579 l0: right_l0,
2580 ..Default::default()
2581 };
2582
2583 merge_levels(cg1, right_levels);
2584 }
2585
2586 {
2587 let mut new_sst_id = 100.into();
2589 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2590 let x = group_split::split_sst_info_for_level_v2(
2591 &mut cg1.levels[2],
2592 &mut new_sst_id,
2593 split_key,
2594 );
2595
2596 assert!(x.is_empty());
2597 }
2598
2599 {
2600 let mut cg1 = cg1.clone();
2602 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2603
2604 let mut new_sst_id = 100.into();
2605 let x = group_split::split_sst_info_for_level_v2(
2606 &mut cg1.levels[0],
2607 &mut new_sst_id,
2608 split_key,
2609 );
2610
2611 assert_eq!(3, x.len());
2612 assert_eq!(1, x[0].sst_id);
2613 assert_eq!(100, x[0].sst_size);
2614 assert_eq!(10, x[1].sst_id);
2615 assert_eq!(100, x[1].sst_size);
2616 assert_eq!(11, x[2].sst_id);
2617 assert_eq!(100, x[2].sst_size);
2618
2619 assert_eq!(0, cg1.levels[0].table_infos.len());
2620 }
2621
2622 {
2623 let mut cg1 = cg1.clone();
2625 let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2626
2627 let mut new_sst_id = 100.into();
2628 let x = group_split::split_sst_info_for_level_v2(
2629 &mut cg1.levels[0],
2630 &mut new_sst_id,
2631 split_key,
2632 );
2633
2634 assert_eq!(0, x.len());
2635 assert_eq!(3, cg1.levels[0].table_infos.len());
2636 }
2637
2638 {
2664 let mut cg1 = cg1.clone();
2666 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2667
2668 let mut new_sst_id = 100.into();
2669 let x = group_split::split_sst_info_for_level_v2(
2670 &mut cg1.levels[0],
2671 &mut new_sst_id,
2672 split_key,
2673 );
2674
2675 assert_eq!(2, x.len());
2676 assert_eq!(100, x[0].sst_id);
2677 assert_eq!(100 / 2, x[0].sst_size);
2678 assert_eq!(11, x[1].sst_id);
2679 assert_eq!(100, x[1].sst_size);
2680 assert_eq!(vec![4], x[1].table_ids);
2681
2682 assert_eq!(2, cg1.levels[0].table_infos.len());
2683 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2684 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2685 assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2686 }
2687 }
2688}