1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::version::{
41 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
42 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
43};
44use crate::{
45 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
46};
47
48#[derive(Debug, Clone, Default)]
49pub struct SstDeltaInfo {
50 pub insert_sst_level: u32,
51 pub insert_sst_infos: Vec<SstableInfo>,
52 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
53}
54
55pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
56
57impl<L> HummockVersionCommon<SstableInfo, L> {
58 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
59 self.levels
60 .get(&compaction_group_id)
61 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
62 }
63
64 pub fn get_compaction_group_levels_mut(
65 &mut self,
66 compaction_group_id: CompactionGroupId,
67 ) -> &mut Levels {
68 self.levels
69 .get_mut(&compaction_group_id)
70 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
71 }
72
73 pub fn get_sst_ids_by_group_id(
75 &self,
76 compaction_group_id: CompactionGroupId,
77 ) -> impl Iterator<Item = HummockSstableId> + '_ {
78 self.levels
79 .iter()
80 .filter_map(move |(cg_id, level)| {
81 if *cg_id == compaction_group_id {
82 Some(level)
83 } else {
84 None
85 }
86 })
87 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
88 .flat_map(|level| level.table_infos.iter())
89 .map(|s| s.sst_id)
90 }
91
92 pub fn level_iter<F: FnMut(&Level) -> bool>(
93 &self,
94 compaction_group_id: CompactionGroupId,
95 mut f: F,
96 ) {
97 if let Some(levels) = self.levels.get(&compaction_group_id) {
98 for sub_level in &levels.l0.sub_levels {
99 if !f(sub_level) {
100 return;
101 }
102 }
103 for level in &levels.levels {
104 if !f(level) {
105 return;
106 }
107 }
108 }
109 }
110
111 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
112 self.levels
114 .get(&compaction_group_id)
115 .map(|group| group.levels.len() + 1)
116 .unwrap_or(0)
117 }
118
119 pub fn safe_epoch_table_watermarks(
120 &self,
121 existing_table_ids: &[u32],
122 ) -> BTreeMap<u32, TableWatermarks> {
123 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
124 }
125}
126
127pub fn safe_epoch_table_watermarks_impl(
128 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
129 existing_table_ids: &[u32],
130) -> BTreeMap<u32, TableWatermarks> {
131 fn extract_single_table_watermark(
132 table_watermarks: &TableWatermarks,
133 ) -> Option<TableWatermarks> {
134 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
135 Some(TableWatermarks {
136 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
137 direction: table_watermarks.direction,
138 watermark_type: table_watermarks.watermark_type,
139 })
140 } else {
141 None
142 }
143 }
144 table_watermarks
145 .iter()
146 .filter_map(|(table_id, table_watermarks)| {
147 let u32_table_id = table_id.table_id();
148 if !existing_table_ids.contains(&u32_table_id) {
149 None
150 } else {
151 extract_single_table_watermark(table_watermarks)
152 .map(|table_watermarks| (table_id.table_id, table_watermarks))
153 }
154 })
155 .collect()
156}
157
158pub fn safe_epoch_read_table_watermarks_impl(
159 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
160) -> BTreeMap<TableId, ReadTableWatermark> {
161 safe_epoch_watermarks
162 .into_iter()
163 .map(|(table_id, watermarks)| {
164 assert_eq!(watermarks.watermarks.len(), 1);
165 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
166 let mut vnode_watermark_map = BTreeMap::new();
167 for vnode_watermark in vnode_watermarks.iter() {
168 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
169 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
170 assert!(
171 vnode_watermark_map
172 .insert(vnode, watermark.clone())
173 .is_none(),
174 "duplicate table watermark on vnode {}",
175 vnode.to_index()
176 );
177 }
178 }
179 (
180 TableId::from(table_id),
181 ReadTableWatermark {
182 direction: watermarks.direction,
183 vnode_watermarks: vnode_watermark_map,
184 },
185 )
186 })
187 .collect()
188}
189
190impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
191 pub fn count_new_ssts_in_group_split(
192 &self,
193 parent_group_id: CompactionGroupId,
194 split_key: Bytes,
195 ) -> u64 {
196 self.levels
197 .get(&parent_group_id)
198 .map_or(0, |parent_levels| {
199 let l0 = &parent_levels.l0;
200 let mut split_count = 0;
201 for sub_level in &l0.sub_levels {
202 assert!(!sub_level.table_infos.is_empty());
203
204 if sub_level.level_type == PbLevelType::Overlapping {
205 split_count += sub_level
207 .table_infos
208 .iter()
209 .map(|sst| {
210 if let group_split::SstSplitType::Both =
211 group_split::need_to_split(sst, split_key.clone())
212 {
213 2
214 } else {
215 0
216 }
217 })
218 .sum::<u64>();
219 continue;
220 }
221
222 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
223 let sst = sub_level.table_infos.get(pos).unwrap();
224
225 if let group_split::SstSplitType::Both =
226 group_split::need_to_split(sst, split_key.clone())
227 {
228 split_count += 2;
229 }
230 }
231
232 for level in &parent_levels.levels {
233 if level.table_infos.is_empty() {
234 continue;
235 }
236 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
237 let sst = level.table_infos.get(pos).unwrap();
238 if let group_split::SstSplitType::Both =
239 group_split::need_to_split(sst, split_key.clone())
240 {
241 split_count += 2;
242 }
243 }
244
245 split_count
246 })
247 }
248
249 pub fn init_with_parent_group(
250 &mut self,
251 parent_group_id: CompactionGroupId,
252 group_id: CompactionGroupId,
253 member_table_ids: BTreeSet<StateTableId>,
254 new_sst_start_id: HummockSstableId,
255 ) {
256 let mut new_sst_id = new_sst_start_id;
257 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
258 if new_sst_start_id != 0 {
259 if cfg!(debug_assertions) {
260 panic!(
261 "non-zero sst start id {} for NewCompactionGroup",
262 new_sst_start_id
263 );
264 } else {
265 warn!(
266 %new_sst_start_id,
267 "non-zero sst start id for NewCompactionGroup"
268 );
269 }
270 }
271 return;
272 } else if !self.levels.contains_key(&parent_group_id) {
273 unreachable!(
274 "non-existing parent group id {} to init from",
275 parent_group_id
276 );
277 }
278 let [parent_levels, cur_levels] = self
279 .levels
280 .get_disjoint_mut([&parent_group_id, &group_id])
281 .map(|res| res.unwrap());
282 parent_levels.compaction_group_version_id += 1;
285 cur_levels.compaction_group_version_id += 1;
286 let l0 = &mut parent_levels.l0;
287 {
288 for sub_level in &mut l0.sub_levels {
289 let target_l0 = &mut cur_levels.l0;
290 let insert_table_infos =
293 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
294 sub_level
295 .table_infos
296 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
297 .for_each(|sst_info| {
298 sub_level.total_file_size -= sst_info.sst_size;
299 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
300 l0.total_file_size -= sst_info.sst_size;
301 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
302 });
303 if insert_table_infos.is_empty() {
304 continue;
305 }
306 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
307 Ok(idx) => {
308 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
309 }
310 Err(idx) => {
311 insert_new_sub_level(
312 target_l0,
313 sub_level.sub_level_id,
314 sub_level.level_type,
315 insert_table_infos,
316 Some(idx),
317 );
318 }
319 }
320 }
321
322 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
323 }
324 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
325 let insert_table_infos =
326 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
327 cur_levels.levels[idx].total_file_size += insert_table_infos
328 .iter()
329 .map(|sst| sst.sst_size)
330 .sum::<u64>();
331 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
332 .iter()
333 .map(|sst| sst.uncompressed_file_size)
334 .sum::<u64>();
335 cur_levels.levels[idx]
336 .table_infos
337 .extend(insert_table_infos);
338 cur_levels.levels[idx]
339 .table_infos
340 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
341 assert!(can_concat(&cur_levels.levels[idx].table_infos));
342 level
343 .table_infos
344 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
345 .for_each(|sst_info| {
346 level.total_file_size -= sst_info.sst_size;
347 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
348 });
349 }
350
351 assert!(
352 parent_levels
353 .l0
354 .sub_levels
355 .iter()
356 .all(|level| !level.table_infos.is_empty())
357 );
358 assert!(
359 cur_levels
360 .l0
361 .sub_levels
362 .iter()
363 .all(|level| !level.table_infos.is_empty())
364 );
365 }
366
367 pub fn build_sst_delta_infos(
368 &self,
369 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
370 ) -> Vec<SstDeltaInfo> {
371 let mut infos = vec![];
372
373 if version_delta.trivial_move {
376 return infos;
377 }
378
379 for (group_id, group_deltas) in &version_delta.group_deltas {
380 let mut info = SstDeltaInfo::default();
381
382 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
383 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
384
385 if !group_deltas.group_deltas.iter().all(|delta| {
387 matches!(
388 delta,
389 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
390 )
391 }) {
392 continue;
393 }
394
395 for group_delta in &group_deltas.group_deltas {
399 match group_delta {
400 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
401 if !inserted_table_infos.is_empty() {
402 info.insert_sst_level = 0;
403 info.insert_sst_infos
404 .extend(inserted_table_infos.iter().cloned());
405 }
406 }
407 GroupDeltaCommon::IntraLevel(intra_level) => {
408 if !intra_level.inserted_table_infos.is_empty() {
409 info.insert_sst_level = intra_level.level_idx;
410 info.insert_sst_infos
411 .extend(intra_level.inserted_table_infos.iter().cloned());
412 }
413 if !intra_level.removed_table_ids.is_empty() {
414 for id in &intra_level.removed_table_ids {
415 if intra_level.level_idx == 0 {
416 removed_l0_ssts.insert(*id);
417 } else {
418 removed_ssts
419 .entry(intra_level.level_idx)
420 .or_default()
421 .insert(*id);
422 }
423 }
424 }
425 }
426 GroupDeltaCommon::GroupConstruct(_)
427 | GroupDeltaCommon::GroupDestroy(_)
428 | GroupDeltaCommon::GroupMerge(_) => {}
429 }
430 }
431
432 let group = self.levels.get(group_id).unwrap();
433 for l0_sub_level in &group.level0().sub_levels {
434 for sst_info in &l0_sub_level.table_infos {
435 if removed_l0_ssts.remove(&sst_info.sst_id) {
436 info.delete_sst_object_ids.push(sst_info.object_id);
437 }
438 }
439 }
440 for level in &group.levels {
441 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
442 for sst_info in &level.table_infos {
443 if removed_level_ssts.remove(&sst_info.sst_id) {
444 info.delete_sst_object_ids.push(sst_info.object_id);
445 }
446 }
447 if !removed_level_ssts.is_empty() {
448 tracing::error!(
449 "removed_level_ssts is not empty: {:?}",
450 removed_level_ssts,
451 );
452 }
453 debug_assert!(removed_level_ssts.is_empty());
454 }
455 }
456
457 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
458 tracing::error!(
459 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
460 removed_l0_ssts,
461 removed_ssts
462 );
463 }
464 debug_assert!(removed_l0_ssts.is_empty());
465 debug_assert!(removed_ssts.is_empty());
466
467 infos.push(info);
468 }
469
470 infos
471 }
472
473 pub fn apply_version_delta(
474 &mut self,
475 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
476 ) {
477 assert_eq!(self.id, version_delta.prev_id);
478
479 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
480 &version_delta.state_table_info_delta,
481 &version_delta.removed_table_ids,
482 );
483
484 #[expect(deprecated)]
485 {
486 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
487 is_commit_epoch = true;
488 tracing::trace!(
489 "max committed epoch bumped but no table committed epoch is changed"
490 );
491 }
492 }
493
494 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
496 let mut is_applied_l0_compact = false;
497 for group_delta in &group_deltas.group_deltas {
498 match group_delta {
499 GroupDeltaCommon::GroupConstruct(group_construct) => {
500 let mut new_levels = build_initial_compaction_group_levels(
501 *compaction_group_id,
502 group_construct.get_group_config().unwrap(),
503 );
504 let parent_group_id = group_construct.parent_group_id;
505 new_levels.parent_group_id = parent_group_id;
506 #[expect(deprecated)]
507 new_levels
509 .member_table_ids
510 .clone_from(&group_construct.table_ids);
511 self.levels.insert(*compaction_group_id, new_levels);
512 let member_table_ids = if group_construct.version()
513 >= CompatibilityVersion::NoMemberTableIds
514 {
515 self.state_table_info
516 .compaction_group_member_table_ids(*compaction_group_id)
517 .iter()
518 .map(|table_id| table_id.table_id)
519 .collect()
520 } else {
521 #[expect(deprecated)]
522 BTreeSet::from_iter(group_construct.table_ids.clone())
524 };
525
526 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
527 let split_key = if group_construct.split_key.is_some() {
528 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
529 } else {
530 None
531 };
532 self.init_with_parent_group_v2(
533 parent_group_id,
534 *compaction_group_id,
535 group_construct.get_new_sst_start_id().into(),
536 split_key.clone(),
537 );
538 } else {
539 self.init_with_parent_group(
541 parent_group_id,
542 *compaction_group_id,
543 member_table_ids,
544 group_construct.get_new_sst_start_id().into(),
545 );
546 }
547 }
548 GroupDeltaCommon::GroupMerge(group_merge) => {
549 tracing::info!(
550 "group_merge left {:?} right {:?}",
551 group_merge.left_group_id,
552 group_merge.right_group_id
553 );
554 self.merge_compaction_group(
555 group_merge.left_group_id,
556 group_merge.right_group_id,
557 )
558 }
559 GroupDeltaCommon::IntraLevel(level_delta) => {
560 let levels =
561 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
562 panic!("compaction group {} does not exist", compaction_group_id)
563 });
564 if is_commit_epoch {
565 assert!(
566 level_delta.removed_table_ids.is_empty(),
567 "no sst should be deleted when committing an epoch"
568 );
569
570 let IntraLevelDelta {
571 level_idx,
572 l0_sub_level_id,
573 inserted_table_infos,
574 ..
575 } = level_delta;
576 {
577 assert_eq!(
578 *level_idx, 0,
579 "we should only add to L0 when we commit an epoch."
580 );
581 if !inserted_table_infos.is_empty() {
582 insert_new_sub_level(
583 &mut levels.l0,
584 *l0_sub_level_id,
585 PbLevelType::Overlapping,
586 inserted_table_infos.clone(),
587 None,
588 );
589 }
590 }
591 } else {
592 levels.apply_compact_ssts(
594 level_delta,
595 self.state_table_info
596 .compaction_group_member_table_ids(*compaction_group_id),
597 );
598 if level_delta.level_idx == 0 {
599 is_applied_l0_compact = true;
600 }
601 }
602 }
603 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
604 let levels =
605 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
606 panic!("compaction group {} does not exist", compaction_group_id)
607 });
608 assert!(is_commit_epoch);
609
610 if !inserted_table_infos.is_empty() {
611 let next_l0_sub_level_id = levels
612 .l0
613 .sub_levels
614 .last()
615 .map(|level| level.sub_level_id + 1)
616 .unwrap_or(1);
617
618 insert_new_sub_level(
619 &mut levels.l0,
620 next_l0_sub_level_id,
621 PbLevelType::Overlapping,
622 inserted_table_infos.clone(),
623 None,
624 );
625 }
626 }
627 GroupDeltaCommon::GroupDestroy(_) => {
628 self.levels.remove(compaction_group_id);
629 }
630 }
631 }
632 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
633 {
634 levels.post_apply_l0_compact();
635 }
636 }
637 self.id = version_delta.id;
638 #[expect(deprecated)]
639 {
640 self.max_committed_epoch = version_delta.max_committed_epoch;
641 }
642
643 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
647 HashMap::new();
648
649 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
651 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
652 if version_delta.removed_table_ids.contains(table_id) {
653 modified_table_watermarks.insert(*table_id, None);
654 } else {
655 let mut current_table_watermarks = (**current_table_watermarks).clone();
656 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
657 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
658 }
659 } else {
660 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
661 }
662 }
663 for (table_id, table_watermarks) in &self.table_watermarks {
664 let safe_epoch = if let Some(state_table_info) =
665 self.state_table_info.info().get(table_id)
666 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
667 && state_table_info.committed_epoch > *oldest_epoch
668 {
669 state_table_info.committed_epoch
671 } else {
672 continue;
674 };
675 let table_watermarks = modified_table_watermarks
676 .entry(*table_id)
677 .or_insert_with(|| Some((**table_watermarks).clone()));
678 if let Some(table_watermarks) = table_watermarks {
679 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
680 }
681 }
682 for (table_id, table_watermarks) in modified_table_watermarks {
684 if let Some(table_watermarks) = table_watermarks {
685 self.table_watermarks
686 .insert(table_id, Arc::new(table_watermarks));
687 } else {
688 self.table_watermarks.remove(&table_id);
689 }
690 }
691
692 Self::apply_change_log_delta(
694 &mut self.table_change_log,
695 &version_delta.change_log_delta,
696 &version_delta.removed_table_ids,
697 &version_delta.state_table_info_delta,
698 &changed_table_info,
699 );
700 }
701
702 pub fn apply_change_log_delta<T: Clone>(
703 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
704 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
705 removed_table_ids: &HashSet<TableId>,
706 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
707 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
708 ) {
709 for (table_id, change_log_delta) in change_log_delta {
710 let new_change_log = &change_log_delta.new_log;
711 match table_change_log.entry(*table_id) {
712 Entry::Occupied(entry) => {
713 let change_log = entry.into_mut();
714 change_log.add_change_log(new_change_log.clone());
715 }
716 Entry::Vacant(entry) => {
717 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
718 }
719 };
720 }
721
722 table_change_log.retain(|table_id, _| {
726 if removed_table_ids.contains(table_id) {
727 return false;
728 }
729 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
730 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
731 } else {
733 return true;
735 }
736 let contains = change_log_delta.contains_key(table_id);
737 if !contains {
738 warn!(
739 ?table_id,
740 "table change log dropped due to no further change log at newly committed epoch",
741 );
742 }
743 contains
744 });
745
746 for (table_id, change_log_delta) in change_log_delta {
748 if let Some(change_log) = table_change_log.get_mut(table_id) {
749 change_log.truncate(change_log_delta.truncate_epoch);
750 }
751 }
752 }
753
754 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
755 let mut ret: BTreeMap<_, _> = BTreeMap::new();
756 for (compaction_group_id, group) in &self.levels {
757 let mut levels = vec![];
758 levels.extend(group.l0.sub_levels.iter());
759 levels.extend(group.levels.iter());
760 for level in levels {
761 for table_info in &level.table_infos {
762 if table_info.sst_id.inner() == table_info.object_id.inner() {
763 continue;
764 }
765 let object_id = table_info.object_id;
766 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
767 entry
768 .entry(*compaction_group_id)
769 .or_default()
770 .push(table_info.sst_id)
771 }
772 }
773 }
774 ret
775 }
776
777 pub fn merge_compaction_group(
778 &mut self,
779 left_group_id: CompactionGroupId,
780 right_group_id: CompactionGroupId,
781 ) {
782 let left_group_id_table_ids = self
784 .state_table_info
785 .compaction_group_member_table_ids(left_group_id)
786 .iter()
787 .map(|table_id| table_id.table_id);
788 let right_group_id_table_ids = self
789 .state_table_info
790 .compaction_group_member_table_ids(right_group_id)
791 .iter()
792 .map(|table_id| table_id.table_id);
793
794 assert!(
795 left_group_id_table_ids
796 .chain(right_group_id_table_ids)
797 .is_sorted()
798 );
799
800 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
801 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
802 panic!(
803 "compaction group should exist right {} all {:?}",
804 right_group_id, total_cg
805 )
806 });
807
808 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
809 panic!(
810 "compaction group should exist left {} all {:?}",
811 left_group_id, total_cg
812 )
813 });
814
815 group_split::merge_levels(left_levels, right_levels);
816 }
817
818 pub fn init_with_parent_group_v2(
819 &mut self,
820 parent_group_id: CompactionGroupId,
821 group_id: CompactionGroupId,
822 new_sst_start_id: HummockSstableId,
823 split_key: Option<Bytes>,
824 ) {
825 let mut new_sst_id = new_sst_start_id;
826 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
827 if new_sst_start_id != 0 {
828 if cfg!(debug_assertions) {
829 panic!(
830 "non-zero sst start id {} for NewCompactionGroup",
831 new_sst_start_id
832 );
833 } else {
834 warn!(
835 %new_sst_start_id,
836 "non-zero sst start id for NewCompactionGroup"
837 );
838 }
839 }
840 return;
841 } else if !self.levels.contains_key(&parent_group_id) {
842 unreachable!(
843 "non-existing parent group id {} to init from (V2)",
844 parent_group_id
845 );
846 }
847
848 let [parent_levels, cur_levels] = self
849 .levels
850 .get_disjoint_mut([&parent_group_id, &group_id])
851 .map(|res| res.unwrap());
852 parent_levels.compaction_group_version_id += 1;
855 cur_levels.compaction_group_version_id += 1;
856
857 let l0 = &mut parent_levels.l0;
858 {
859 for sub_level in &mut l0.sub_levels {
860 let target_l0 = &mut cur_levels.l0;
861 let insert_table_infos = if let Some(split_key) = &split_key {
864 group_split::split_sst_info_for_level_v2(
865 sub_level,
866 &mut new_sst_id,
867 split_key.clone(),
868 )
869 } else {
870 vec![]
871 };
872
873 if insert_table_infos.is_empty() {
874 continue;
875 }
876
877 sub_level
878 .table_infos
879 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
880 .for_each(|sst_info| {
881 sub_level.total_file_size -= sst_info.sst_size;
882 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
883 l0.total_file_size -= sst_info.sst_size;
884 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
885 });
886 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
887 Ok(idx) => {
888 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
889 }
890 Err(idx) => {
891 insert_new_sub_level(
892 target_l0,
893 sub_level.sub_level_id,
894 sub_level.level_type,
895 insert_table_infos,
896 Some(idx),
897 );
898 }
899 }
900 }
901
902 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
903 }
904
905 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
906 let insert_table_infos = if let Some(split_key) = &split_key {
907 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
908 } else {
909 vec![]
910 };
911
912 if insert_table_infos.is_empty() {
913 continue;
914 }
915
916 cur_levels.levels[idx].total_file_size += insert_table_infos
917 .iter()
918 .map(|sst| sst.sst_size)
919 .sum::<u64>();
920 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
921 .iter()
922 .map(|sst| sst.uncompressed_file_size)
923 .sum::<u64>();
924 cur_levels.levels[idx]
925 .table_infos
926 .extend(insert_table_infos);
927 cur_levels.levels[idx]
928 .table_infos
929 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
930 assert!(can_concat(&cur_levels.levels[idx].table_infos));
931 level
932 .table_infos
933 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
934 .for_each(|sst_info| {
935 level.total_file_size -= sst_info.sst_size;
936 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
937 });
938 }
939
940 assert!(
941 parent_levels
942 .l0
943 .sub_levels
944 .iter()
945 .all(|level| !level.table_infos.is_empty())
946 );
947 assert!(
948 cur_levels
949 .l0
950 .sub_levels
951 .iter()
952 .all(|level| !level.table_infos.is_empty())
953 );
954 }
955}
956
957impl<T> HummockVersionCommon<T>
958where
959 T: SstableIdReader + ObjectIdReader,
960{
961 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
962 self.get_sst_infos(exclude_change_log)
963 .map(|s| s.object_id())
964 .collect()
965 }
966
967 pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockObjectId> {
968 match HummockObjectId::Sstable(0.into()) {
972 HummockObjectId::Sstable(_) => {}
973 };
974 self.get_sst_infos(exclude_change_log)
975 .map(|s| HummockObjectId::Sstable(s.object_id()))
976 .collect()
977 }
978
979 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
980 self.get_sst_infos(exclude_change_log)
981 .map(|s| s.sst_id())
982 .collect()
983 }
984
985 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
986 let may_table_change_log = if exclude_change_log {
987 None
988 } else {
989 Some(self.table_change_log.values())
990 };
991 self.get_combined_levels()
992 .flat_map(|level| level.table_infos.iter())
993 .chain(
994 may_table_change_log
995 .map(|v| {
996 v.flat_map(|table_change_log| {
997 table_change_log.iter().flat_map(|epoch_change_log| {
998 epoch_change_log
999 .old_value
1000 .iter()
1001 .chain(epoch_change_log.new_value.iter())
1002 })
1003 })
1004 })
1005 .into_iter()
1006 .flatten(),
1007 )
1008 }
1009}
1010
1011impl Levels {
1012 pub(crate) fn apply_compact_ssts(
1013 &mut self,
1014 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1015 member_table_ids: &BTreeSet<TableId>,
1016 ) {
1017 let IntraLevelDeltaCommon {
1018 level_idx,
1019 l0_sub_level_id,
1020 inserted_table_infos: insert_table_infos,
1021 vnode_partition_count,
1022 removed_table_ids: delete_sst_ids_set,
1023 compaction_group_version_id,
1024 } = level_delta;
1025 let new_vnode_partition_count = *vnode_partition_count;
1026
1027 if is_compaction_task_expired(
1028 self.compaction_group_version_id,
1029 *compaction_group_version_id,
1030 ) {
1031 warn!(
1032 current_compaction_group_version_id = self.compaction_group_version_id,
1033 delta_compaction_group_version_id = compaction_group_version_id,
1034 level_idx,
1035 l0_sub_level_id,
1036 insert_table_infos = ?insert_table_infos
1037 .iter()
1038 .map(|sst| (sst.sst_id, sst.object_id))
1039 .collect_vec(),
1040 ?delete_sst_ids_set,
1041 "This VersionDelta may be committed by an expired compact task. Please check it."
1042 );
1043 return;
1044 }
1045 if !delete_sst_ids_set.is_empty() {
1046 if *level_idx == 0 {
1047 for level in &mut self.l0.sub_levels {
1048 level_delete_ssts(level, delete_sst_ids_set);
1049 }
1050 } else {
1051 let idx = *level_idx as usize - 1;
1052 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1053 }
1054 }
1055
1056 if !insert_table_infos.is_empty() {
1057 let insert_sst_level_id = *level_idx;
1058 let insert_sub_level_id = *l0_sub_level_id;
1059 if insert_sst_level_id == 0 {
1060 let l0 = &mut self.l0;
1061 let index = l0
1062 .sub_levels
1063 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1064 assert!(
1065 index < l0.sub_levels.len()
1066 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1067 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1068 insert_sub_level_id,
1069 delete_sst_ids_set,
1070 l0.sub_levels
1071 .iter()
1072 .map(|level| level.sub_level_id)
1073 .collect_vec()
1074 );
1075 if l0.sub_levels[index].table_infos.is_empty()
1076 && member_table_ids.len() == 1
1077 && insert_table_infos.iter().all(|sst| {
1078 sst.table_ids.len() == 1
1079 && sst.table_ids[0]
1080 == member_table_ids.iter().next().expect("non-empty").table_id
1081 })
1082 {
1083 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1086 }
1087 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1088 } else {
1089 let idx = insert_sst_level_id as usize - 1;
1090 if self.levels[idx].table_infos.is_empty()
1091 && insert_table_infos
1092 .iter()
1093 .all(|sst| sst.table_ids.len() == 1)
1094 {
1095 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1096 } else if self.levels[idx].vnode_partition_count != 0
1097 && new_vnode_partition_count == 0
1098 && member_table_ids.len() > 1
1099 {
1100 self.levels[idx].vnode_partition_count = 0;
1101 }
1102 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1103 }
1104 }
1105 }
1106
1107 pub(crate) fn post_apply_l0_compact(&mut self) {
1108 {
1109 self.l0
1110 .sub_levels
1111 .retain(|level| !level.table_infos.is_empty());
1112 self.l0.total_file_size = self
1113 .l0
1114 .sub_levels
1115 .iter()
1116 .map(|level| level.total_file_size)
1117 .sum::<u64>();
1118 self.l0.uncompressed_file_size = self
1119 .l0
1120 .sub_levels
1121 .iter()
1122 .map(|level| level.uncompressed_file_size)
1123 .sum::<u64>();
1124 }
1125 }
1126}
1127
1128impl<T, L> HummockVersionCommon<T, L> {
1129 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1130 self.levels
1131 .values()
1132 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1133 }
1134}
1135
1136pub fn build_initial_compaction_group_levels(
1137 group_id: CompactionGroupId,
1138 compaction_config: &CompactionConfig,
1139) -> Levels {
1140 let mut levels = vec![];
1141 for l in 0..compaction_config.get_max_level() {
1142 levels.push(Level {
1143 level_idx: (l + 1) as u32,
1144 level_type: PbLevelType::Nonoverlapping,
1145 table_infos: vec![],
1146 total_file_size: 0,
1147 sub_level_id: 0,
1148 uncompressed_file_size: 0,
1149 vnode_partition_count: 0,
1150 });
1151 }
1152 #[expect(deprecated)] Levels {
1154 levels,
1155 l0: OverlappingLevel {
1156 sub_levels: vec![],
1157 total_file_size: 0,
1158 uncompressed_file_size: 0,
1159 },
1160 group_id,
1161 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1162 member_table_ids: vec![],
1163 compaction_group_version_id: 0,
1164 }
1165}
1166
1167fn split_sst_info_for_level(
1168 member_table_ids: &BTreeSet<u32>,
1169 level: &mut Level,
1170 new_sst_id: &mut HummockSstableId,
1171) -> Vec<SstableInfo> {
1172 let mut insert_table_infos = vec![];
1175 for sst_info in &mut level.table_infos {
1176 let removed_table_ids = sst_info
1177 .table_ids
1178 .iter()
1179 .filter(|table_id| member_table_ids.contains(table_id))
1180 .cloned()
1181 .collect_vec();
1182 let sst_size = sst_info.sst_size;
1183 if sst_size / 2 == 0 {
1184 tracing::warn!(
1185 id = %sst_info.sst_id,
1186 object_id = %sst_info.object_id,
1187 sst_size = sst_info.sst_size,
1188 file_size = sst_info.file_size,
1189 "Sstable sst_size is under expected",
1190 );
1191 };
1192 if !removed_table_ids.is_empty() {
1193 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1194 sst_info,
1195 new_sst_id,
1196 sst_size / 2,
1197 sst_size / 2,
1198 member_table_ids.iter().cloned().collect_vec(),
1199 );
1200 *sst_info = modified_sst;
1201 insert_table_infos.push(branch_sst);
1202 }
1203 }
1204 insert_table_infos
1205}
1206
1207pub fn get_compaction_group_ids(
1209 version: &HummockVersion,
1210) -> impl Iterator<Item = CompactionGroupId> + '_ {
1211 version.levels.keys().cloned()
1212}
1213
1214pub fn get_table_compaction_group_id_mapping(
1215 version: &HummockVersion,
1216) -> HashMap<StateTableId, CompactionGroupId> {
1217 version
1218 .state_table_info
1219 .info()
1220 .iter()
1221 .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1222 .collect()
1223}
1224
1225pub fn get_compaction_group_ssts(
1227 version: &HummockVersion,
1228 group_id: CompactionGroupId,
1229) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1230 let group_levels = version.get_compaction_group_levels(group_id);
1231 group_levels
1232 .l0
1233 .sub_levels
1234 .iter()
1235 .rev()
1236 .chain(group_levels.levels.iter())
1237 .flat_map(|level| {
1238 level
1239 .table_infos
1240 .iter()
1241 .map(|table_info| (table_info.object_id, table_info.sst_id))
1242 })
1243}
1244
1245pub fn new_sub_level(
1246 sub_level_id: u64,
1247 level_type: PbLevelType,
1248 table_infos: Vec<SstableInfo>,
1249) -> Level {
1250 if level_type == PbLevelType::Nonoverlapping {
1251 debug_assert!(
1252 can_concat(&table_infos),
1253 "sst of non-overlapping level is not concat-able: {:?}",
1254 table_infos
1255 );
1256 }
1257 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1258 let uncompressed_file_size = table_infos
1259 .iter()
1260 .map(|table| table.uncompressed_file_size)
1261 .sum();
1262 Level {
1263 level_idx: 0,
1264 level_type,
1265 table_infos,
1266 total_file_size,
1267 sub_level_id,
1268 uncompressed_file_size,
1269 vnode_partition_count: 0,
1270 }
1271}
1272
1273pub fn add_ssts_to_sub_level(
1274 l0: &mut OverlappingLevel,
1275 sub_level_idx: usize,
1276 insert_table_infos: Vec<SstableInfo>,
1277) {
1278 insert_table_infos.iter().for_each(|sst| {
1279 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1280 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1281 l0.total_file_size += sst.sst_size;
1282 l0.uncompressed_file_size += sst.uncompressed_file_size;
1283 });
1284 l0.sub_levels[sub_level_idx]
1285 .table_infos
1286 .extend(insert_table_infos);
1287 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1288 l0.sub_levels[sub_level_idx]
1289 .table_infos
1290 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1291 assert!(
1292 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1293 "sstable ids: {:?}",
1294 l0.sub_levels[sub_level_idx]
1295 .table_infos
1296 .iter()
1297 .map(|sst| sst.sst_id)
1298 .collect_vec()
1299 );
1300 }
1301}
1302
1303pub fn insert_new_sub_level(
1305 l0: &mut OverlappingLevel,
1306 insert_sub_level_id: u64,
1307 level_type: PbLevelType,
1308 insert_table_infos: Vec<SstableInfo>,
1309 sub_level_insert_hint: Option<usize>,
1310) {
1311 if insert_sub_level_id == u64::MAX {
1312 return;
1313 }
1314 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1315 insert_pos
1316 } else {
1317 if let Some(newest_level) = l0.sub_levels.last() {
1318 assert!(
1319 newest_level.sub_level_id < insert_sub_level_id,
1320 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1321 newest_level.sub_level_id,
1322 insert_sub_level_id,
1323 l0,
1324 );
1325 }
1326 l0.sub_levels.len()
1327 };
1328 #[cfg(debug_assertions)]
1329 {
1330 if insert_pos > 0 {
1331 if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1332 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1333 }
1334 }
1335 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1336 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1337 }
1338 }
1339 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1342 l0.total_file_size += level.total_file_size;
1343 l0.uncompressed_file_size += level.uncompressed_file_size;
1344 l0.sub_levels.insert(insert_pos, level);
1345}
1346
1347fn level_delete_ssts(
1351 operand: &mut Level,
1352 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1353) -> bool {
1354 let original_len = operand.table_infos.len();
1355 operand
1356 .table_infos
1357 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1358 operand.total_file_size = operand
1359 .table_infos
1360 .iter()
1361 .map(|table| table.sst_size)
1362 .sum::<u64>();
1363 operand.uncompressed_file_size = operand
1364 .table_infos
1365 .iter()
1366 .map(|table| table.uncompressed_file_size)
1367 .sum::<u64>();
1368 original_len != operand.table_infos.len()
1369}
1370
1371fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1372 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1373 format!(
1374 "sstable ids: {:?}",
1375 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1376 )
1377 }
1378 operand.total_file_size += insert_table_infos
1379 .iter()
1380 .map(|sst| sst.sst_size)
1381 .sum::<u64>();
1382 operand.uncompressed_file_size += insert_table_infos
1383 .iter()
1384 .map(|sst| sst.uncompressed_file_size)
1385 .sum::<u64>();
1386 if operand.level_type == PbLevelType::Overlapping {
1387 operand.level_type = PbLevelType::Nonoverlapping;
1388 operand
1389 .table_infos
1390 .extend(insert_table_infos.iter().cloned());
1391 operand
1392 .table_infos
1393 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1394 assert!(
1395 can_concat(&operand.table_infos),
1396 "{}",
1397 display_sstable_infos(&operand.table_infos)
1398 );
1399 } else if !insert_table_infos.is_empty() {
1400 let sorted_insert: Vec<_> = insert_table_infos
1401 .iter()
1402 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1403 .cloned()
1404 .collect();
1405 let first = &sorted_insert[0];
1406 let last = &sorted_insert[sorted_insert.len() - 1];
1407 let pos = operand
1408 .table_infos
1409 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1410 if pos >= operand.table_infos.len()
1411 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1412 {
1413 operand.table_infos.splice(pos..pos, sorted_insert);
1414 let validate_range = operand
1416 .table_infos
1417 .iter()
1418 .skip(pos.saturating_sub(1))
1419 .take(insert_table_infos.len() + 2)
1420 .collect_vec();
1421 assert!(
1422 can_concat(&validate_range),
1423 "{}",
1424 display_sstable_infos(&validate_range),
1425 );
1426 } else {
1427 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1430 for i in insert_table_infos {
1431 let pos = operand
1432 .table_infos
1433 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1434 operand.table_infos.insert(pos, i.clone());
1435 }
1436 assert!(
1437 can_concat(&operand.table_infos),
1438 "{}",
1439 display_sstable_infos(&operand.table_infos)
1440 );
1441 }
1442 }
1443}
1444
1445pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1446 match HummockObjectId::Sstable(0.into()) {
1450 HummockObjectId::Sstable(_) => {}
1451 };
1452 version
1453 .levels
1454 .values()
1455 .flat_map(|cg| {
1456 cg.level0()
1457 .sub_levels
1458 .iter()
1459 .chain(cg.levels.iter())
1460 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1461 })
1462 .chain(version.table_change_log.values().flat_map(|c| {
1463 c.iter().flat_map(|l| {
1464 l.old_value
1465 .iter()
1466 .chain(l.new_value.iter())
1467 .map(|t| (t.object_id, t.file_size))
1468 })
1469 }))
1470 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1471 .collect()
1472}
1473
1474pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1477 let mut res = Vec::new();
1478 for (group_id, levels) in &version.levels {
1480 if levels.group_id != *group_id {
1482 res.push(format!(
1483 "GROUP {}: inconsistent group id {} in Levels",
1484 group_id, levels.group_id
1485 ));
1486 }
1487
1488 let validate_level = |group: CompactionGroupId,
1489 expected_level_idx: u32,
1490 level: &Level,
1491 res: &mut Vec<String>| {
1492 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1493 if level.level_idx == 0 {
1494 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1495 if level.table_infos.is_empty() {
1497 res.push(format!("{}: empty level", level_identifier));
1498 }
1499 } else if level.level_type != PbLevelType::Nonoverlapping {
1500 res.push(format!(
1502 "{}: level type {:?} is not non-overlapping",
1503 level_identifier, level.level_type
1504 ));
1505 }
1506
1507 if level.level_idx != expected_level_idx {
1509 res.push(format!(
1510 "{}: mismatched level idx {}",
1511 level_identifier, expected_level_idx
1512 ));
1513 }
1514
1515 let mut prev_table_info: Option<&SstableInfo> = None;
1516 for table_info in &level.table_infos {
1517 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1519 res.push(format!(
1520 "{} SST {}: table_ids not sorted",
1521 level_identifier, table_info.object_id
1522 ));
1523 }
1524
1525 if level.level_type == PbLevelType::Nonoverlapping {
1527 if let Some(prev) = prev_table_info.take() {
1528 if prev
1529 .key_range
1530 .compare_right_with(&table_info.key_range.left)
1531 != Ordering::Less
1532 {
1533 res.push(format!(
1534 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1535 level_identifier, table_info.object_id, prev, table_info
1536 ));
1537 }
1538 }
1539 let _ = prev_table_info.insert(table_info);
1540 }
1541 }
1542 };
1543
1544 let l0 = &levels.l0;
1545 let mut prev_sub_level_id = u64::MAX;
1546 for sub_level in &l0.sub_levels {
1547 if sub_level.sub_level_id >= prev_sub_level_id {
1549 res.push(format!(
1550 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1551 group_id, sub_level.level_idx, prev_sub_level_id
1552 ));
1553 }
1554 prev_sub_level_id = sub_level.sub_level_id;
1555
1556 validate_level(*group_id, 0, sub_level, &mut res);
1557 }
1558
1559 for idx in 1..=levels.levels.len() {
1560 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1561 }
1562 }
1563 res
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568 use std::collections::{HashMap, HashSet};
1569
1570 use bytes::Bytes;
1571 use risingwave_common::catalog::TableId;
1572 use risingwave_common::hash::VirtualNode;
1573 use risingwave_common::util::epoch::test_epoch;
1574 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1575
1576 use super::group_split;
1577 use crate::HummockVersionId;
1578 use crate::compaction_group::group_split::*;
1579 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1580 use crate::key::{FullKey, gen_key_from_str};
1581 use crate::key_range::KeyRange;
1582 use crate::level::{Level, Levels, OverlappingLevel};
1583 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1584 use crate::version::{
1585 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1586 };
1587
1588 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1589 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1590 }
1591
1592 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1593 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1594 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1595 let full_key_l = FullKey::for_test(
1596 TableId::new(*table_ids.first().unwrap()),
1597 table_key_l,
1598 epoch,
1599 )
1600 .encode();
1601 let full_key_r =
1602 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1603 .encode();
1604
1605 SstableInfoInner {
1606 sst_id: sst_id.into(),
1607 key_range: KeyRange {
1608 left: full_key_l.into(),
1609 right: full_key_r.into(),
1610 right_exclusive: false,
1611 },
1612 table_ids,
1613 object_id: sst_id.into(),
1614 min_epoch: 20,
1615 max_epoch: 20,
1616 file_size: 100,
1617 sst_size: 100,
1618 ..Default::default()
1619 }
1620 }
1621
1622 #[test]
1623 fn test_get_sst_object_ids() {
1624 let mut version = HummockVersion {
1625 id: HummockVersionId::new(0),
1626 levels: HashMap::from_iter([(
1627 0,
1628 Levels {
1629 levels: vec![],
1630 l0: OverlappingLevel {
1631 sub_levels: vec![],
1632 total_file_size: 0,
1633 uncompressed_file_size: 0,
1634 },
1635 ..Default::default()
1636 },
1637 )]),
1638 ..Default::default()
1639 };
1640 assert_eq!(version.get_object_ids(false).len(), 0);
1641
1642 version
1644 .levels
1645 .get_mut(&0)
1646 .unwrap()
1647 .l0
1648 .sub_levels
1649 .push(Level {
1650 table_infos: vec![
1651 SstableInfoInner {
1652 object_id: 11.into(),
1653 sst_id: 11.into(),
1654 ..Default::default()
1655 }
1656 .into(),
1657 ],
1658 ..Default::default()
1659 });
1660 assert_eq!(version.get_object_ids(false).len(), 1);
1661
1662 version.levels.get_mut(&0).unwrap().levels.push(Level {
1664 table_infos: vec![
1665 SstableInfoInner {
1666 object_id: 22.into(),
1667 sst_id: 22.into(),
1668 ..Default::default()
1669 }
1670 .into(),
1671 ],
1672 ..Default::default()
1673 });
1674 assert_eq!(version.get_object_ids(false).len(), 2);
1675 }
1676
1677 #[test]
1678 fn test_apply_version_delta() {
1679 let mut version = HummockVersion {
1680 id: HummockVersionId::new(0),
1681 levels: HashMap::from_iter([
1682 (
1683 0,
1684 build_initial_compaction_group_levels(
1685 0,
1686 &CompactionConfig {
1687 max_level: 6,
1688 ..Default::default()
1689 },
1690 ),
1691 ),
1692 (
1693 1,
1694 build_initial_compaction_group_levels(
1695 1,
1696 &CompactionConfig {
1697 max_level: 6,
1698 ..Default::default()
1699 },
1700 ),
1701 ),
1702 ]),
1703 ..Default::default()
1704 };
1705 let version_delta = HummockVersionDelta {
1706 id: HummockVersionId::new(1),
1707 group_deltas: HashMap::from_iter([
1708 (
1709 2,
1710 GroupDeltas {
1711 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1712 group_config: Some(CompactionConfig {
1713 max_level: 6,
1714 ..Default::default()
1715 }),
1716 ..Default::default()
1717 }))],
1718 },
1719 ),
1720 (
1721 0,
1722 GroupDeltas {
1723 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1724 },
1725 ),
1726 (
1727 1,
1728 GroupDeltas {
1729 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1730 1,
1731 0,
1732 HashSet::new(),
1733 vec![
1734 SstableInfoInner {
1735 object_id: 1.into(),
1736 sst_id: 1.into(),
1737 ..Default::default()
1738 }
1739 .into(),
1740 ],
1741 0,
1742 version
1743 .levels
1744 .get(&1)
1745 .as_ref()
1746 .unwrap()
1747 .compaction_group_version_id,
1748 ))],
1749 },
1750 ),
1751 ]),
1752 ..Default::default()
1753 };
1754 let version_delta = version_delta;
1755
1756 version.apply_version_delta(&version_delta);
1757 let mut cg1 = build_initial_compaction_group_levels(
1758 1,
1759 &CompactionConfig {
1760 max_level: 6,
1761 ..Default::default()
1762 },
1763 );
1764 cg1.levels[0] = Level {
1765 level_idx: 1,
1766 level_type: LevelType::Nonoverlapping,
1767 table_infos: vec![
1768 SstableInfoInner {
1769 object_id: 1.into(),
1770 sst_id: 1.into(),
1771 ..Default::default()
1772 }
1773 .into(),
1774 ],
1775 ..Default::default()
1776 };
1777 assert_eq!(
1778 version,
1779 HummockVersion {
1780 id: HummockVersionId::new(1),
1781 levels: HashMap::from_iter([
1782 (
1783 2,
1784 build_initial_compaction_group_levels(
1785 2,
1786 &CompactionConfig {
1787 max_level: 6,
1788 ..Default::default()
1789 },
1790 ),
1791 ),
1792 (1, cg1),
1793 ]),
1794 ..Default::default()
1795 }
1796 );
1797 }
1798
1799 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1800 gen_sst_info_impl(object_id, table_ids, left, right).into()
1801 }
1802
1803 fn gen_sst_info_impl(
1804 object_id: u64,
1805 table_ids: Vec<u32>,
1806 left: Bytes,
1807 right: Bytes,
1808 ) -> SstableInfoInner {
1809 SstableInfoInner {
1810 object_id: object_id.into(),
1811 sst_id: object_id.into(),
1812 key_range: KeyRange {
1813 left,
1814 right,
1815 right_exclusive: false,
1816 },
1817 table_ids,
1818 file_size: 100,
1819 sst_size: 100,
1820 uncompressed_file_size: 100,
1821 ..Default::default()
1822 }
1823 }
1824
1825 #[test]
1826 fn test_merge_levels() {
1827 let mut left_levels = build_initial_compaction_group_levels(
1828 1,
1829 &CompactionConfig {
1830 max_level: 6,
1831 ..Default::default()
1832 },
1833 );
1834
1835 let mut right_levels = build_initial_compaction_group_levels(
1836 2,
1837 &CompactionConfig {
1838 max_level: 6,
1839 ..Default::default()
1840 },
1841 );
1842
1843 left_levels.levels[0] = Level {
1844 level_idx: 1,
1845 level_type: LevelType::Nonoverlapping,
1846 table_infos: vec![
1847 gen_sst_info(
1848 1,
1849 vec![3],
1850 FullKey::for_test(
1851 TableId::new(3),
1852 gen_key_from_str(VirtualNode::from_index(1), "1"),
1853 0,
1854 )
1855 .encode()
1856 .into(),
1857 FullKey::for_test(
1858 TableId::new(3),
1859 gen_key_from_str(VirtualNode::from_index(200), "1"),
1860 0,
1861 )
1862 .encode()
1863 .into(),
1864 ),
1865 gen_sst_info(
1866 10,
1867 vec![3, 4],
1868 FullKey::for_test(
1869 TableId::new(3),
1870 gen_key_from_str(VirtualNode::from_index(201), "1"),
1871 0,
1872 )
1873 .encode()
1874 .into(),
1875 FullKey::for_test(
1876 TableId::new(4),
1877 gen_key_from_str(VirtualNode::from_index(10), "1"),
1878 0,
1879 )
1880 .encode()
1881 .into(),
1882 ),
1883 gen_sst_info(
1884 11,
1885 vec![4],
1886 FullKey::for_test(
1887 TableId::new(4),
1888 gen_key_from_str(VirtualNode::from_index(11), "1"),
1889 0,
1890 )
1891 .encode()
1892 .into(),
1893 FullKey::for_test(
1894 TableId::new(4),
1895 gen_key_from_str(VirtualNode::from_index(200), "1"),
1896 0,
1897 )
1898 .encode()
1899 .into(),
1900 ),
1901 ],
1902 total_file_size: 300,
1903 ..Default::default()
1904 };
1905
1906 left_levels.l0.sub_levels.push(Level {
1907 level_idx: 0,
1908 table_infos: vec![gen_sst_info(
1909 3,
1910 vec![3],
1911 FullKey::for_test(
1912 TableId::new(3),
1913 gen_key_from_str(VirtualNode::from_index(1), "1"),
1914 0,
1915 )
1916 .encode()
1917 .into(),
1918 FullKey::for_test(
1919 TableId::new(3),
1920 gen_key_from_str(VirtualNode::from_index(200), "1"),
1921 0,
1922 )
1923 .encode()
1924 .into(),
1925 )],
1926 sub_level_id: 101,
1927 level_type: LevelType::Overlapping,
1928 total_file_size: 100,
1929 ..Default::default()
1930 });
1931
1932 left_levels.l0.sub_levels.push(Level {
1933 level_idx: 0,
1934 table_infos: vec![gen_sst_info(
1935 3,
1936 vec![3],
1937 FullKey::for_test(
1938 TableId::new(3),
1939 gen_key_from_str(VirtualNode::from_index(1), "1"),
1940 0,
1941 )
1942 .encode()
1943 .into(),
1944 FullKey::for_test(
1945 TableId::new(3),
1946 gen_key_from_str(VirtualNode::from_index(200), "1"),
1947 0,
1948 )
1949 .encode()
1950 .into(),
1951 )],
1952 sub_level_id: 103,
1953 level_type: LevelType::Overlapping,
1954 total_file_size: 100,
1955 ..Default::default()
1956 });
1957
1958 left_levels.l0.sub_levels.push(Level {
1959 level_idx: 0,
1960 table_infos: vec![gen_sst_info(
1961 3,
1962 vec![3],
1963 FullKey::for_test(
1964 TableId::new(3),
1965 gen_key_from_str(VirtualNode::from_index(1), "1"),
1966 0,
1967 )
1968 .encode()
1969 .into(),
1970 FullKey::for_test(
1971 TableId::new(3),
1972 gen_key_from_str(VirtualNode::from_index(200), "1"),
1973 0,
1974 )
1975 .encode()
1976 .into(),
1977 )],
1978 sub_level_id: 105,
1979 level_type: LevelType::Nonoverlapping,
1980 total_file_size: 100,
1981 ..Default::default()
1982 });
1983
1984 right_levels.levels[0] = Level {
1985 level_idx: 1,
1986 level_type: LevelType::Nonoverlapping,
1987 table_infos: vec![
1988 gen_sst_info(
1989 1,
1990 vec![5],
1991 FullKey::for_test(
1992 TableId::new(5),
1993 gen_key_from_str(VirtualNode::from_index(1), "1"),
1994 0,
1995 )
1996 .encode()
1997 .into(),
1998 FullKey::for_test(
1999 TableId::new(5),
2000 gen_key_from_str(VirtualNode::from_index(200), "1"),
2001 0,
2002 )
2003 .encode()
2004 .into(),
2005 ),
2006 gen_sst_info(
2007 10,
2008 vec![5, 6],
2009 FullKey::for_test(
2010 TableId::new(5),
2011 gen_key_from_str(VirtualNode::from_index(201), "1"),
2012 0,
2013 )
2014 .encode()
2015 .into(),
2016 FullKey::for_test(
2017 TableId::new(6),
2018 gen_key_from_str(VirtualNode::from_index(10), "1"),
2019 0,
2020 )
2021 .encode()
2022 .into(),
2023 ),
2024 gen_sst_info(
2025 11,
2026 vec![6],
2027 FullKey::for_test(
2028 TableId::new(6),
2029 gen_key_from_str(VirtualNode::from_index(11), "1"),
2030 0,
2031 )
2032 .encode()
2033 .into(),
2034 FullKey::for_test(
2035 TableId::new(6),
2036 gen_key_from_str(VirtualNode::from_index(200), "1"),
2037 0,
2038 )
2039 .encode()
2040 .into(),
2041 ),
2042 ],
2043 total_file_size: 300,
2044 ..Default::default()
2045 };
2046
2047 right_levels.l0.sub_levels.push(Level {
2048 level_idx: 0,
2049 table_infos: vec![gen_sst_info(
2050 3,
2051 vec![5],
2052 FullKey::for_test(
2053 TableId::new(5),
2054 gen_key_from_str(VirtualNode::from_index(1), "1"),
2055 0,
2056 )
2057 .encode()
2058 .into(),
2059 FullKey::for_test(
2060 TableId::new(5),
2061 gen_key_from_str(VirtualNode::from_index(200), "1"),
2062 0,
2063 )
2064 .encode()
2065 .into(),
2066 )],
2067 sub_level_id: 101,
2068 level_type: LevelType::Overlapping,
2069 total_file_size: 100,
2070 ..Default::default()
2071 });
2072
2073 right_levels.l0.sub_levels.push(Level {
2074 level_idx: 0,
2075 table_infos: vec![gen_sst_info(
2076 5,
2077 vec![5],
2078 FullKey::for_test(
2079 TableId::new(5),
2080 gen_key_from_str(VirtualNode::from_index(1), "1"),
2081 0,
2082 )
2083 .encode()
2084 .into(),
2085 FullKey::for_test(
2086 TableId::new(5),
2087 gen_key_from_str(VirtualNode::from_index(200), "1"),
2088 0,
2089 )
2090 .encode()
2091 .into(),
2092 )],
2093 sub_level_id: 102,
2094 level_type: LevelType::Overlapping,
2095 total_file_size: 100,
2096 ..Default::default()
2097 });
2098
2099 right_levels.l0.sub_levels.push(Level {
2100 level_idx: 0,
2101 table_infos: vec![gen_sst_info(
2102 3,
2103 vec![5],
2104 FullKey::for_test(
2105 TableId::new(5),
2106 gen_key_from_str(VirtualNode::from_index(1), "1"),
2107 0,
2108 )
2109 .encode()
2110 .into(),
2111 FullKey::for_test(
2112 TableId::new(5),
2113 gen_key_from_str(VirtualNode::from_index(200), "1"),
2114 0,
2115 )
2116 .encode()
2117 .into(),
2118 )],
2119 sub_level_id: 103,
2120 level_type: LevelType::Nonoverlapping,
2121 total_file_size: 100,
2122 ..Default::default()
2123 });
2124
2125 {
2126 let mut left_levels = Levels::default();
2128 let right_levels = Levels::default();
2129
2130 group_split::merge_levels(&mut left_levels, right_levels);
2131 }
2132
2133 {
2134 let mut left_levels = build_initial_compaction_group_levels(
2136 1,
2137 &CompactionConfig {
2138 max_level: 6,
2139 ..Default::default()
2140 },
2141 );
2142 let right_levels = right_levels.clone();
2143
2144 group_split::merge_levels(&mut left_levels, right_levels);
2145
2146 assert!(left_levels.l0.sub_levels.len() == 3);
2147 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2148 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2149 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2150 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2151 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2152 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2153
2154 assert!(left_levels.levels[0].level_idx == 1);
2155 assert_eq!(300, left_levels.levels[0].total_file_size);
2156 }
2157
2158 {
2159 let mut left_levels = left_levels.clone();
2161 let right_levels = build_initial_compaction_group_levels(
2162 2,
2163 &CompactionConfig {
2164 max_level: 6,
2165 ..Default::default()
2166 },
2167 );
2168
2169 group_split::merge_levels(&mut left_levels, right_levels);
2170
2171 assert!(left_levels.l0.sub_levels.len() == 3);
2172 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2173 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2174 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2175 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2176 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2177 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2178
2179 assert!(left_levels.levels[0].level_idx == 1);
2180 assert_eq!(300, left_levels.levels[0].total_file_size);
2181 }
2182
2183 {
2184 let mut left_levels = left_levels.clone();
2185 let right_levels = right_levels.clone();
2186
2187 group_split::merge_levels(&mut left_levels, right_levels);
2188
2189 assert!(left_levels.l0.sub_levels.len() == 6);
2190 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2191 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2192 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2193 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2194 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2195 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2196 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2197 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2198 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2199 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2200 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2201 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2202
2203 assert!(left_levels.levels[0].level_idx == 1);
2204 assert_eq!(600, left_levels.levels[0].total_file_size);
2205 }
2206 }
2207
2208 #[test]
2209 fn test_get_split_pos() {
2210 let epoch = test_epoch(1);
2211 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2212 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2213 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2214
2215 let ssts = vec![s1, s2, s3];
2216 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2217
2218 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2219 assert_eq!(1, pos);
2220
2221 let pos = group_split::get_split_pos(&vec![], split_key);
2222 assert_eq!(0, pos);
2223 }
2224
2225 #[test]
2226 fn test_split_sst() {
2227 let epoch = test_epoch(1);
2228 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2229
2230 {
2231 let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2232 let origin_sst = sst.clone();
2233 let sst_size = origin_sst.sst_size;
2234 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2235 assert_eq!(SstSplitType::Both, split_type);
2236
2237 let mut new_sst_id = 10.into();
2238 let (origin_sst, branched_sst) = group_split::split_sst(
2239 origin_sst,
2240 &mut new_sst_id,
2241 split_key,
2242 sst_size / 2,
2243 sst_size / 2,
2244 );
2245
2246 let origin_sst = origin_sst.unwrap();
2247 let branched_sst = branched_sst.unwrap();
2248
2249 assert!(origin_sst.key_range.right_exclusive);
2250 assert!(
2251 origin_sst
2252 .key_range
2253 .right
2254 .cmp(&branched_sst.key_range.left)
2255 .is_le()
2256 );
2257 assert!(origin_sst.table_ids.is_sorted());
2258 assert!(branched_sst.table_ids.is_sorted());
2259 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2260 assert!(branched_sst.sst_size < origin_sst.file_size);
2261 assert_eq!(10, branched_sst.sst_id);
2262 assert_eq!(11, origin_sst.sst_id);
2263 assert_eq!(&3, branched_sst.table_ids.first().unwrap()); }
2265
2266 {
2267 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2269 let origin_sst = sst.clone();
2270 let sst_size = origin_sst.sst_size;
2271 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2272 assert_eq!(SstSplitType::Both, split_type);
2273
2274 let mut new_sst_id = 10.into();
2275 let (origin_sst, branched_sst) = group_split::split_sst(
2276 origin_sst,
2277 &mut new_sst_id,
2278 split_key,
2279 sst_size / 2,
2280 sst_size / 2,
2281 );
2282
2283 let origin_sst = origin_sst.unwrap();
2284 let branched_sst = branched_sst.unwrap();
2285
2286 assert!(origin_sst.key_range.right_exclusive);
2287 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2288 assert!(origin_sst.table_ids.is_sorted());
2289 assert!(branched_sst.table_ids.is_sorted());
2290 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2291 assert!(branched_sst.sst_size < origin_sst.file_size);
2292 assert_eq!(10, branched_sst.sst_id);
2293 assert_eq!(11, origin_sst.sst_id);
2294 assert_eq!(&5, branched_sst.table_ids.first().unwrap()); }
2296
2297 {
2298 let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2299 let origin_sst = sst.clone();
2300 let split_type = group_split::need_to_split(&origin_sst, split_key);
2301 assert_eq!(SstSplitType::Left, split_type);
2302 }
2303
2304 {
2305 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2306 let origin_sst = sst.clone();
2307 let split_type = group_split::need_to_split(&origin_sst, split_key);
2308 assert_eq!(SstSplitType::Both, split_type);
2309
2310 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2311 let origin_sst = sst.clone();
2312 let split_type = group_split::need_to_split(&origin_sst, split_key);
2313 assert_eq!(SstSplitType::Right, split_type);
2314 }
2315
2316 {
2317 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2319 sst.key_range.right = sst.key_range.left.clone();
2320 let sst: SstableInfo = sst.into();
2321 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2322 let origin_sst = sst.clone();
2323 let sst_size = origin_sst.sst_size;
2324
2325 let mut new_sst_id = 10.into();
2326 let (origin_sst, branched_sst) = group_split::split_sst(
2327 origin_sst,
2328 &mut new_sst_id,
2329 split_key,
2330 sst_size / 2,
2331 sst_size / 2,
2332 );
2333
2334 assert!(origin_sst.is_none());
2335 assert!(branched_sst.is_some());
2336 }
2337 }
2338
2339 #[test]
2340 fn test_split_sst_info_for_level() {
2341 let mut version = HummockVersion {
2342 id: HummockVersionId(0),
2343 levels: HashMap::from_iter([(
2344 1,
2345 build_initial_compaction_group_levels(
2346 1,
2347 &CompactionConfig {
2348 max_level: 6,
2349 ..Default::default()
2350 },
2351 ),
2352 )]),
2353 ..Default::default()
2354 };
2355
2356 let cg1 = version.levels.get_mut(&1).unwrap();
2357
2358 cg1.levels[0] = Level {
2359 level_idx: 1,
2360 level_type: LevelType::Nonoverlapping,
2361 table_infos: vec![
2362 gen_sst_info(
2363 1,
2364 vec![3],
2365 FullKey::for_test(
2366 TableId::new(3),
2367 gen_key_from_str(VirtualNode::from_index(1), "1"),
2368 0,
2369 )
2370 .encode()
2371 .into(),
2372 FullKey::for_test(
2373 TableId::new(3),
2374 gen_key_from_str(VirtualNode::from_index(200), "1"),
2375 0,
2376 )
2377 .encode()
2378 .into(),
2379 ),
2380 gen_sst_info(
2381 10,
2382 vec![3, 4],
2383 FullKey::for_test(
2384 TableId::new(3),
2385 gen_key_from_str(VirtualNode::from_index(201), "1"),
2386 0,
2387 )
2388 .encode()
2389 .into(),
2390 FullKey::for_test(
2391 TableId::new(4),
2392 gen_key_from_str(VirtualNode::from_index(10), "1"),
2393 0,
2394 )
2395 .encode()
2396 .into(),
2397 ),
2398 gen_sst_info(
2399 11,
2400 vec![4],
2401 FullKey::for_test(
2402 TableId::new(4),
2403 gen_key_from_str(VirtualNode::from_index(11), "1"),
2404 0,
2405 )
2406 .encode()
2407 .into(),
2408 FullKey::for_test(
2409 TableId::new(4),
2410 gen_key_from_str(VirtualNode::from_index(200), "1"),
2411 0,
2412 )
2413 .encode()
2414 .into(),
2415 ),
2416 ],
2417 total_file_size: 300,
2418 ..Default::default()
2419 };
2420
2421 cg1.l0.sub_levels.push(Level {
2422 level_idx: 0,
2423 table_infos: vec![
2424 gen_sst_info(
2425 2,
2426 vec![2],
2427 FullKey::for_test(
2428 TableId::new(0),
2429 gen_key_from_str(VirtualNode::from_index(1), "1"),
2430 0,
2431 )
2432 .encode()
2433 .into(),
2434 FullKey::for_test(
2435 TableId::new(2),
2436 gen_key_from_str(VirtualNode::from_index(200), "1"),
2437 0,
2438 )
2439 .encode()
2440 .into(),
2441 ),
2442 gen_sst_info(
2443 22,
2444 vec![2],
2445 FullKey::for_test(
2446 TableId::new(0),
2447 gen_key_from_str(VirtualNode::from_index(1), "1"),
2448 0,
2449 )
2450 .encode()
2451 .into(),
2452 FullKey::for_test(
2453 TableId::new(2),
2454 gen_key_from_str(VirtualNode::from_index(200), "1"),
2455 0,
2456 )
2457 .encode()
2458 .into(),
2459 ),
2460 gen_sst_info(
2461 23,
2462 vec![2],
2463 FullKey::for_test(
2464 TableId::new(0),
2465 gen_key_from_str(VirtualNode::from_index(1), "1"),
2466 0,
2467 )
2468 .encode()
2469 .into(),
2470 FullKey::for_test(
2471 TableId::new(2),
2472 gen_key_from_str(VirtualNode::from_index(200), "1"),
2473 0,
2474 )
2475 .encode()
2476 .into(),
2477 ),
2478 gen_sst_info(
2479 24,
2480 vec![2],
2481 FullKey::for_test(
2482 TableId::new(2),
2483 gen_key_from_str(VirtualNode::from_index(1), "1"),
2484 0,
2485 )
2486 .encode()
2487 .into(),
2488 FullKey::for_test(
2489 TableId::new(2),
2490 gen_key_from_str(VirtualNode::from_index(200), "1"),
2491 0,
2492 )
2493 .encode()
2494 .into(),
2495 ),
2496 gen_sst_info(
2497 25,
2498 vec![2],
2499 FullKey::for_test(
2500 TableId::new(0),
2501 gen_key_from_str(VirtualNode::from_index(1), "1"),
2502 0,
2503 )
2504 .encode()
2505 .into(),
2506 FullKey::for_test(
2507 TableId::new(0),
2508 gen_key_from_str(VirtualNode::from_index(200), "1"),
2509 0,
2510 )
2511 .encode()
2512 .into(),
2513 ),
2514 ],
2515 sub_level_id: 101,
2516 level_type: LevelType::Overlapping,
2517 total_file_size: 300,
2518 ..Default::default()
2519 });
2520
2521 {
2522 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2524
2525 let mut new_sst_id = 100.into();
2526 let x = group_split::split_sst_info_for_level_v2(
2527 &mut cg1.l0.sub_levels[0],
2528 &mut new_sst_id,
2529 split_key,
2530 );
2531 let mut right_l0 = OverlappingLevel {
2540 sub_levels: vec![],
2541 total_file_size: 0,
2542 uncompressed_file_size: 0,
2543 };
2544
2545 right_l0.sub_levels.push(Level {
2546 level_idx: 0,
2547 table_infos: x,
2548 sub_level_id: 101,
2549 total_file_size: 100,
2550 level_type: LevelType::Overlapping,
2551 ..Default::default()
2552 });
2553
2554 let right_levels = Levels {
2555 levels: vec![],
2556 l0: right_l0,
2557 ..Default::default()
2558 };
2559
2560 merge_levels(cg1, right_levels);
2561 }
2562
2563 {
2564 let mut new_sst_id = 100.into();
2566 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2567 let x = group_split::split_sst_info_for_level_v2(
2568 &mut cg1.levels[2],
2569 &mut new_sst_id,
2570 split_key,
2571 );
2572
2573 assert!(x.is_empty());
2574 }
2575
2576 {
2577 let mut cg1 = cg1.clone();
2579 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2580
2581 let mut new_sst_id = 100.into();
2582 let x = group_split::split_sst_info_for_level_v2(
2583 &mut cg1.levels[0],
2584 &mut new_sst_id,
2585 split_key,
2586 );
2587
2588 assert_eq!(3, x.len());
2589 assert_eq!(1, x[0].sst_id);
2590 assert_eq!(100, x[0].sst_size);
2591 assert_eq!(10, x[1].sst_id);
2592 assert_eq!(100, x[1].sst_size);
2593 assert_eq!(11, x[2].sst_id);
2594 assert_eq!(100, x[2].sst_size);
2595
2596 assert_eq!(0, cg1.levels[0].table_infos.len());
2597 }
2598
2599 {
2600 let mut cg1 = cg1.clone();
2602 let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2603
2604 let mut new_sst_id = 100.into();
2605 let x = group_split::split_sst_info_for_level_v2(
2606 &mut cg1.levels[0],
2607 &mut new_sst_id,
2608 split_key,
2609 );
2610
2611 assert_eq!(0, x.len());
2612 assert_eq!(3, cg1.levels[0].table_infos.len());
2613 }
2614
2615 {
2641 let mut cg1 = cg1.clone();
2643 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2644
2645 let mut new_sst_id = 100.into();
2646 let x = group_split::split_sst_info_for_level_v2(
2647 &mut cg1.levels[0],
2648 &mut new_sst_id,
2649 split_key,
2650 );
2651
2652 assert_eq!(2, x.len());
2653 assert_eq!(100, x[0].sst_id);
2654 assert_eq!(100 / 2, x[0].sst_size);
2655 assert_eq!(11, x[1].sst_id);
2656 assert_eq!(100, x[1].sst_size);
2657 assert_eq!(vec![4], x[1].table_ids);
2658
2659 assert_eq!(2, cg1.levels[0].table_infos.len());
2660 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2661 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2662 assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2663 }
2664 }
2665}