1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::version::{
41 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
42 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
43};
44use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId, can_concat};
45
46#[derive(Debug, Clone, Default)]
47pub struct SstDeltaInfo {
48 pub insert_sst_level: u32,
49 pub insert_sst_infos: Vec<SstableInfo>,
50 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
51}
52
53pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
54
55impl<L> HummockVersionCommon<SstableInfo, L> {
56 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
57 self.levels
58 .get(&compaction_group_id)
59 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
60 }
61
62 pub fn get_compaction_group_levels_mut(
63 &mut self,
64 compaction_group_id: CompactionGroupId,
65 ) -> &mut Levels {
66 self.levels
67 .get_mut(&compaction_group_id)
68 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
69 }
70
71 pub fn get_sst_ids_by_group_id(
73 &self,
74 compaction_group_id: CompactionGroupId,
75 ) -> impl Iterator<Item = u64> + '_ {
76 self.levels
77 .iter()
78 .filter_map(move |(cg_id, level)| {
79 if *cg_id == compaction_group_id {
80 Some(level)
81 } else {
82 None
83 }
84 })
85 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
86 .flat_map(|level| level.table_infos.iter())
87 .map(|s| s.sst_id)
88 }
89
90 pub fn level_iter<F: FnMut(&Level) -> bool>(
91 &self,
92 compaction_group_id: CompactionGroupId,
93 mut f: F,
94 ) {
95 if let Some(levels) = self.levels.get(&compaction_group_id) {
96 for sub_level in &levels.l0.sub_levels {
97 if !f(sub_level) {
98 return;
99 }
100 }
101 for level in &levels.levels {
102 if !f(level) {
103 return;
104 }
105 }
106 }
107 }
108
109 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
110 self.levels
112 .get(&compaction_group_id)
113 .map(|group| group.levels.len() + 1)
114 .unwrap_or(0)
115 }
116
117 pub fn safe_epoch_table_watermarks(
118 &self,
119 existing_table_ids: &[u32],
120 ) -> BTreeMap<u32, TableWatermarks> {
121 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
122 }
123}
124
125pub fn safe_epoch_table_watermarks_impl(
126 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
127 existing_table_ids: &[u32],
128) -> BTreeMap<u32, TableWatermarks> {
129 fn extract_single_table_watermark(
130 table_watermarks: &TableWatermarks,
131 ) -> Option<TableWatermarks> {
132 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
133 Some(TableWatermarks {
134 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
135 direction: table_watermarks.direction,
136 watermark_type: table_watermarks.watermark_type,
137 })
138 } else {
139 None
140 }
141 }
142 table_watermarks
143 .iter()
144 .filter_map(|(table_id, table_watermarks)| {
145 let u32_table_id = table_id.table_id();
146 if !existing_table_ids.contains(&u32_table_id) {
147 None
148 } else {
149 extract_single_table_watermark(table_watermarks)
150 .map(|table_watermarks| (table_id.table_id, table_watermarks))
151 }
152 })
153 .collect()
154}
155
156pub fn safe_epoch_read_table_watermarks_impl(
157 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
158) -> BTreeMap<TableId, ReadTableWatermark> {
159 safe_epoch_watermarks
160 .into_iter()
161 .map(|(table_id, watermarks)| {
162 assert_eq!(watermarks.watermarks.len(), 1);
163 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
164 let mut vnode_watermark_map = BTreeMap::new();
165 for vnode_watermark in vnode_watermarks.iter() {
166 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
167 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
168 assert!(
169 vnode_watermark_map
170 .insert(vnode, watermark.clone())
171 .is_none(),
172 "duplicate table watermark on vnode {}",
173 vnode.to_index()
174 );
175 }
176 }
177 (
178 TableId::from(table_id),
179 ReadTableWatermark {
180 direction: watermarks.direction,
181 vnode_watermarks: vnode_watermark_map,
182 },
183 )
184 })
185 .collect()
186}
187
188impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
189 pub fn count_new_ssts_in_group_split(
190 &self,
191 parent_group_id: CompactionGroupId,
192 split_key: Bytes,
193 ) -> u64 {
194 self.levels
195 .get(&parent_group_id)
196 .map_or(0, |parent_levels| {
197 let l0 = &parent_levels.l0;
198 let mut split_count = 0;
199 for sub_level in &l0.sub_levels {
200 assert!(!sub_level.table_infos.is_empty());
201
202 if sub_level.level_type == PbLevelType::Overlapping {
203 split_count += sub_level
205 .table_infos
206 .iter()
207 .map(|sst| {
208 if let group_split::SstSplitType::Both =
209 group_split::need_to_split(sst, split_key.clone())
210 {
211 2
212 } else {
213 0
214 }
215 })
216 .sum::<u64>();
217 continue;
218 }
219
220 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
221 let sst = sub_level.table_infos.get(pos).unwrap();
222
223 if let group_split::SstSplitType::Both =
224 group_split::need_to_split(sst, split_key.clone())
225 {
226 split_count += 2;
227 }
228 }
229
230 for level in &parent_levels.levels {
231 if level.table_infos.is_empty() {
232 continue;
233 }
234 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
235 let sst = level.table_infos.get(pos).unwrap();
236 if let group_split::SstSplitType::Both =
237 group_split::need_to_split(sst, split_key.clone())
238 {
239 split_count += 2;
240 }
241 }
242
243 split_count
244 })
245 }
246
247 pub fn init_with_parent_group(
248 &mut self,
249 parent_group_id: CompactionGroupId,
250 group_id: CompactionGroupId,
251 member_table_ids: BTreeSet<StateTableId>,
252 new_sst_start_id: u64,
253 ) {
254 let mut new_sst_id = new_sst_start_id;
255 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
256 if new_sst_start_id != 0 {
257 if cfg!(debug_assertions) {
258 panic!(
259 "non-zero sst start id {} for NewCompactionGroup",
260 new_sst_start_id
261 );
262 } else {
263 warn!(
264 new_sst_start_id,
265 "non-zero sst start id for NewCompactionGroup"
266 );
267 }
268 }
269 return;
270 } else if !self.levels.contains_key(&parent_group_id) {
271 unreachable!(
272 "non-existing parent group id {} to init from",
273 parent_group_id
274 );
275 }
276 let [parent_levels, cur_levels] = self
277 .levels
278 .get_disjoint_mut([&parent_group_id, &group_id])
279 .map(|res| res.unwrap());
280 parent_levels.compaction_group_version_id += 1;
283 cur_levels.compaction_group_version_id += 1;
284 let l0 = &mut parent_levels.l0;
285 {
286 for sub_level in &mut l0.sub_levels {
287 let target_l0 = &mut cur_levels.l0;
288 let insert_table_infos =
291 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
292 sub_level
293 .table_infos
294 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
295 .for_each(|sst_info| {
296 sub_level.total_file_size -= sst_info.sst_size;
297 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
298 l0.total_file_size -= sst_info.sst_size;
299 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
300 });
301 if insert_table_infos.is_empty() {
302 continue;
303 }
304 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
305 Ok(idx) => {
306 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
307 }
308 Err(idx) => {
309 insert_new_sub_level(
310 target_l0,
311 sub_level.sub_level_id,
312 sub_level.level_type,
313 insert_table_infos,
314 Some(idx),
315 );
316 }
317 }
318 }
319
320 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
321 }
322 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
323 let insert_table_infos =
324 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
325 cur_levels.levels[idx].total_file_size += insert_table_infos
326 .iter()
327 .map(|sst| sst.sst_size)
328 .sum::<u64>();
329 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
330 .iter()
331 .map(|sst| sst.uncompressed_file_size)
332 .sum::<u64>();
333 cur_levels.levels[idx]
334 .table_infos
335 .extend(insert_table_infos);
336 cur_levels.levels[idx]
337 .table_infos
338 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
339 assert!(can_concat(&cur_levels.levels[idx].table_infos));
340 level
341 .table_infos
342 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
343 .for_each(|sst_info| {
344 level.total_file_size -= sst_info.sst_size;
345 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
346 });
347 }
348
349 assert!(
350 parent_levels
351 .l0
352 .sub_levels
353 .iter()
354 .all(|level| !level.table_infos.is_empty())
355 );
356 assert!(
357 cur_levels
358 .l0
359 .sub_levels
360 .iter()
361 .all(|level| !level.table_infos.is_empty())
362 );
363 }
364
365 pub fn build_sst_delta_infos(
366 &self,
367 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
368 ) -> Vec<SstDeltaInfo> {
369 let mut infos = vec![];
370
371 if version_delta.trivial_move {
374 return infos;
375 }
376
377 for (group_id, group_deltas) in &version_delta.group_deltas {
378 let mut info = SstDeltaInfo::default();
379
380 let mut removed_l0_ssts: BTreeSet<u64> = BTreeSet::new();
381 let mut removed_ssts: BTreeMap<u32, BTreeSet<u64>> = BTreeMap::new();
382
383 if !group_deltas.group_deltas.iter().all(|delta| {
385 matches!(
386 delta,
387 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
388 )
389 }) {
390 continue;
391 }
392
393 for group_delta in &group_deltas.group_deltas {
397 match group_delta {
398 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
399 if !inserted_table_infos.is_empty() {
400 info.insert_sst_level = 0;
401 info.insert_sst_infos
402 .extend(inserted_table_infos.iter().cloned());
403 }
404 }
405 GroupDeltaCommon::IntraLevel(intra_level) => {
406 if !intra_level.inserted_table_infos.is_empty() {
407 info.insert_sst_level = intra_level.level_idx;
408 info.insert_sst_infos
409 .extend(intra_level.inserted_table_infos.iter().cloned());
410 }
411 if !intra_level.removed_table_ids.is_empty() {
412 for id in &intra_level.removed_table_ids {
413 if intra_level.level_idx == 0 {
414 removed_l0_ssts.insert(*id);
415 } else {
416 removed_ssts
417 .entry(intra_level.level_idx)
418 .or_default()
419 .insert(*id);
420 }
421 }
422 }
423 }
424 GroupDeltaCommon::GroupConstruct(_)
425 | GroupDeltaCommon::GroupDestroy(_)
426 | GroupDeltaCommon::GroupMerge(_) => {}
427 }
428 }
429
430 let group = self.levels.get(group_id).unwrap();
431 for l0_sub_level in &group.level0().sub_levels {
432 for sst_info in &l0_sub_level.table_infos {
433 if removed_l0_ssts.remove(&sst_info.sst_id) {
434 info.delete_sst_object_ids.push(sst_info.object_id);
435 }
436 }
437 }
438 for level in &group.levels {
439 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
440 for sst_info in &level.table_infos {
441 if removed_level_ssts.remove(&sst_info.sst_id) {
442 info.delete_sst_object_ids.push(sst_info.object_id);
443 }
444 }
445 if !removed_level_ssts.is_empty() {
446 tracing::error!(
447 "removed_level_ssts is not empty: {:?}",
448 removed_level_ssts,
449 );
450 }
451 debug_assert!(removed_level_ssts.is_empty());
452 }
453 }
454
455 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
456 tracing::error!(
457 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
458 removed_l0_ssts,
459 removed_ssts
460 );
461 }
462 debug_assert!(removed_l0_ssts.is_empty());
463 debug_assert!(removed_ssts.is_empty());
464
465 infos.push(info);
466 }
467
468 infos
469 }
470
471 pub fn apply_version_delta(
472 &mut self,
473 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
474 ) {
475 assert_eq!(self.id, version_delta.prev_id);
476
477 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
478 &version_delta.state_table_info_delta,
479 &version_delta.removed_table_ids,
480 );
481
482 #[expect(deprecated)]
483 {
484 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
485 is_commit_epoch = true;
486 tracing::trace!(
487 "max committed epoch bumped but no table committed epoch is changed"
488 );
489 }
490 }
491
492 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
494 let mut is_applied_l0_compact = false;
495 for group_delta in &group_deltas.group_deltas {
496 match group_delta {
497 GroupDeltaCommon::GroupConstruct(group_construct) => {
498 let mut new_levels = build_initial_compaction_group_levels(
499 *compaction_group_id,
500 group_construct.get_group_config().unwrap(),
501 );
502 let parent_group_id = group_construct.parent_group_id;
503 new_levels.parent_group_id = parent_group_id;
504 #[expect(deprecated)]
505 new_levels
507 .member_table_ids
508 .clone_from(&group_construct.table_ids);
509 self.levels.insert(*compaction_group_id, new_levels);
510 let member_table_ids = if group_construct.version()
511 >= CompatibilityVersion::NoMemberTableIds
512 {
513 self.state_table_info
514 .compaction_group_member_table_ids(*compaction_group_id)
515 .iter()
516 .map(|table_id| table_id.table_id)
517 .collect()
518 } else {
519 #[expect(deprecated)]
520 BTreeSet::from_iter(group_construct.table_ids.clone())
522 };
523
524 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
525 let split_key = if group_construct.split_key.is_some() {
526 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
527 } else {
528 None
529 };
530 self.init_with_parent_group_v2(
531 parent_group_id,
532 *compaction_group_id,
533 group_construct.get_new_sst_start_id(),
534 split_key.clone(),
535 );
536 } else {
537 self.init_with_parent_group(
539 parent_group_id,
540 *compaction_group_id,
541 member_table_ids,
542 group_construct.get_new_sst_start_id(),
543 );
544 }
545 }
546 GroupDeltaCommon::GroupMerge(group_merge) => {
547 tracing::info!(
548 "group_merge left {:?} right {:?}",
549 group_merge.left_group_id,
550 group_merge.right_group_id
551 );
552 self.merge_compaction_group(
553 group_merge.left_group_id,
554 group_merge.right_group_id,
555 )
556 }
557 GroupDeltaCommon::IntraLevel(level_delta) => {
558 let levels =
559 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
560 panic!("compaction group {} does not exist", compaction_group_id)
561 });
562 if is_commit_epoch {
563 assert!(
564 level_delta.removed_table_ids.is_empty(),
565 "no sst should be deleted when committing an epoch"
566 );
567
568 let IntraLevelDelta {
569 level_idx,
570 l0_sub_level_id,
571 inserted_table_infos,
572 ..
573 } = level_delta;
574 {
575 assert_eq!(
576 *level_idx, 0,
577 "we should only add to L0 when we commit an epoch."
578 );
579 if !inserted_table_infos.is_empty() {
580 insert_new_sub_level(
581 &mut levels.l0,
582 *l0_sub_level_id,
583 PbLevelType::Overlapping,
584 inserted_table_infos.clone(),
585 None,
586 );
587 }
588 }
589 } else {
590 levels.apply_compact_ssts(
592 level_delta,
593 self.state_table_info
594 .compaction_group_member_table_ids(*compaction_group_id),
595 );
596 if level_delta.level_idx == 0 {
597 is_applied_l0_compact = true;
598 }
599 }
600 }
601 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
602 let levels =
603 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
604 panic!("compaction group {} does not exist", compaction_group_id)
605 });
606 assert!(is_commit_epoch);
607
608 if !inserted_table_infos.is_empty() {
609 let next_l0_sub_level_id = levels
610 .l0
611 .sub_levels
612 .last()
613 .map(|level| level.sub_level_id + 1)
614 .unwrap_or(1);
615
616 insert_new_sub_level(
617 &mut levels.l0,
618 next_l0_sub_level_id,
619 PbLevelType::Overlapping,
620 inserted_table_infos.clone(),
621 None,
622 );
623 }
624 }
625 GroupDeltaCommon::GroupDestroy(_) => {
626 self.levels.remove(compaction_group_id);
627 }
628 }
629 }
630 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
631 {
632 levels.post_apply_l0_compact();
633 }
634 }
635 self.id = version_delta.id;
636 #[expect(deprecated)]
637 {
638 self.max_committed_epoch = version_delta.max_committed_epoch;
639 }
640
641 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
645 HashMap::new();
646
647 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
649 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
650 if version_delta.removed_table_ids.contains(table_id) {
651 modified_table_watermarks.insert(*table_id, None);
652 } else {
653 let mut current_table_watermarks = (**current_table_watermarks).clone();
654 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
655 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
656 }
657 } else {
658 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
659 }
660 }
661 for (table_id, table_watermarks) in &self.table_watermarks {
662 let safe_epoch = if let Some(state_table_info) =
663 self.state_table_info.info().get(table_id)
664 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
665 && state_table_info.committed_epoch > *oldest_epoch
666 {
667 state_table_info.committed_epoch
669 } else {
670 continue;
672 };
673 let table_watermarks = modified_table_watermarks
674 .entry(*table_id)
675 .or_insert_with(|| Some((**table_watermarks).clone()));
676 if let Some(table_watermarks) = table_watermarks {
677 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
678 }
679 }
680 for (table_id, table_watermarks) in modified_table_watermarks {
682 if let Some(table_watermarks) = table_watermarks {
683 self.table_watermarks
684 .insert(table_id, Arc::new(table_watermarks));
685 } else {
686 self.table_watermarks.remove(&table_id);
687 }
688 }
689
690 Self::apply_change_log_delta(
692 &mut self.table_change_log,
693 &version_delta.change_log_delta,
694 &version_delta.removed_table_ids,
695 &version_delta.state_table_info_delta,
696 &changed_table_info,
697 );
698 }
699
700 pub fn apply_change_log_delta<T: Clone>(
701 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
702 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
703 removed_table_ids: &HashSet<TableId>,
704 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
705 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
706 ) {
707 for (table_id, change_log_delta) in change_log_delta {
708 let new_change_log = &change_log_delta.new_log;
709 match table_change_log.entry(*table_id) {
710 Entry::Occupied(entry) => {
711 let change_log = entry.into_mut();
712 change_log.add_change_log(new_change_log.clone());
713 }
714 Entry::Vacant(entry) => {
715 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
716 }
717 };
718 }
719
720 table_change_log.retain(|table_id, _| {
724 if removed_table_ids.contains(table_id) {
725 return false;
726 }
727 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
728 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
729 } else {
731 return true;
733 }
734 let contains = change_log_delta.contains_key(table_id);
735 if !contains {
736 warn!(
737 ?table_id,
738 "table change log dropped due to no further change log at newly committed epoch",
739 );
740 }
741 contains
742 });
743
744 for (table_id, change_log_delta) in change_log_delta {
746 if let Some(change_log) = table_change_log.get_mut(table_id) {
747 change_log.truncate(change_log_delta.truncate_epoch);
748 }
749 }
750 }
751
752 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
753 let mut ret: BTreeMap<_, _> = BTreeMap::new();
754 for (compaction_group_id, group) in &self.levels {
755 let mut levels = vec![];
756 levels.extend(group.l0.sub_levels.iter());
757 levels.extend(group.levels.iter());
758 for level in levels {
759 for table_info in &level.table_infos {
760 if table_info.sst_id == table_info.object_id {
761 continue;
762 }
763 let object_id = table_info.object_id;
764 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
765 entry
766 .entry(*compaction_group_id)
767 .or_default()
768 .push(table_info.sst_id)
769 }
770 }
771 }
772 ret
773 }
774
775 pub fn merge_compaction_group(
776 &mut self,
777 left_group_id: CompactionGroupId,
778 right_group_id: CompactionGroupId,
779 ) {
780 let left_group_id_table_ids = self
782 .state_table_info
783 .compaction_group_member_table_ids(left_group_id)
784 .iter()
785 .map(|table_id| table_id.table_id);
786 let right_group_id_table_ids = self
787 .state_table_info
788 .compaction_group_member_table_ids(right_group_id)
789 .iter()
790 .map(|table_id| table_id.table_id);
791
792 assert!(
793 left_group_id_table_ids
794 .chain(right_group_id_table_ids)
795 .is_sorted()
796 );
797
798 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
799 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
800 panic!(
801 "compaction group should exist right {} all {:?}",
802 right_group_id, total_cg
803 )
804 });
805
806 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
807 panic!(
808 "compaction group should exist left {} all {:?}",
809 left_group_id, total_cg
810 )
811 });
812
813 group_split::merge_levels(left_levels, right_levels);
814 }
815
816 pub fn init_with_parent_group_v2(
817 &mut self,
818 parent_group_id: CompactionGroupId,
819 group_id: CompactionGroupId,
820 new_sst_start_id: u64,
821 split_key: Option<Bytes>,
822 ) {
823 let mut new_sst_id = new_sst_start_id;
824 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
825 if new_sst_start_id != 0 {
826 if cfg!(debug_assertions) {
827 panic!(
828 "non-zero sst start id {} for NewCompactionGroup",
829 new_sst_start_id
830 );
831 } else {
832 warn!(
833 new_sst_start_id,
834 "non-zero sst start id for NewCompactionGroup"
835 );
836 }
837 }
838 return;
839 } else if !self.levels.contains_key(&parent_group_id) {
840 unreachable!(
841 "non-existing parent group id {} to init from (V2)",
842 parent_group_id
843 );
844 }
845
846 let [parent_levels, cur_levels] = self
847 .levels
848 .get_disjoint_mut([&parent_group_id, &group_id])
849 .map(|res| res.unwrap());
850 parent_levels.compaction_group_version_id += 1;
853 cur_levels.compaction_group_version_id += 1;
854
855 let l0 = &mut parent_levels.l0;
856 {
857 for sub_level in &mut l0.sub_levels {
858 let target_l0 = &mut cur_levels.l0;
859 let insert_table_infos = if let Some(split_key) = &split_key {
862 group_split::split_sst_info_for_level_v2(
863 sub_level,
864 &mut new_sst_id,
865 split_key.clone(),
866 )
867 } else {
868 vec![]
869 };
870
871 if insert_table_infos.is_empty() {
872 continue;
873 }
874
875 sub_level
876 .table_infos
877 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
878 .for_each(|sst_info| {
879 sub_level.total_file_size -= sst_info.sst_size;
880 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
881 l0.total_file_size -= sst_info.sst_size;
882 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
883 });
884 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
885 Ok(idx) => {
886 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
887 }
888 Err(idx) => {
889 insert_new_sub_level(
890 target_l0,
891 sub_level.sub_level_id,
892 sub_level.level_type,
893 insert_table_infos,
894 Some(idx),
895 );
896 }
897 }
898 }
899
900 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
901 }
902
903 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
904 let insert_table_infos = if let Some(split_key) = &split_key {
905 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
906 } else {
907 vec![]
908 };
909
910 if insert_table_infos.is_empty() {
911 continue;
912 }
913
914 cur_levels.levels[idx].total_file_size += insert_table_infos
915 .iter()
916 .map(|sst| sst.sst_size)
917 .sum::<u64>();
918 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
919 .iter()
920 .map(|sst| sst.uncompressed_file_size)
921 .sum::<u64>();
922 cur_levels.levels[idx]
923 .table_infos
924 .extend(insert_table_infos);
925 cur_levels.levels[idx]
926 .table_infos
927 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
928 assert!(can_concat(&cur_levels.levels[idx].table_infos));
929 level
930 .table_infos
931 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
932 .for_each(|sst_info| {
933 level.total_file_size -= sst_info.sst_size;
934 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
935 });
936 }
937
938 assert!(
939 parent_levels
940 .l0
941 .sub_levels
942 .iter()
943 .all(|level| !level.table_infos.is_empty())
944 );
945 assert!(
946 cur_levels
947 .l0
948 .sub_levels
949 .iter()
950 .all(|level| !level.table_infos.is_empty())
951 );
952 }
953}
954
955impl<T> HummockVersionCommon<T>
956where
957 T: SstableIdReader + ObjectIdReader,
958{
959 pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
960 self.get_sst_infos(exclude_change_log)
961 .map(|s| s.object_id())
962 .collect()
963 }
964
965 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
966 self.get_sst_infos(exclude_change_log)
967 .map(|s| s.sst_id())
968 .collect()
969 }
970
971 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
972 let may_table_change_log = if exclude_change_log {
973 None
974 } else {
975 Some(self.table_change_log.values())
976 };
977 self.get_combined_levels()
978 .flat_map(|level| level.table_infos.iter())
979 .chain(
980 may_table_change_log
981 .map(|v| {
982 v.flat_map(|table_change_log| {
983 table_change_log.iter().flat_map(|epoch_change_log| {
984 epoch_change_log
985 .old_value
986 .iter()
987 .chain(epoch_change_log.new_value.iter())
988 })
989 })
990 })
991 .into_iter()
992 .flatten(),
993 )
994 }
995}
996
997impl Levels {
998 pub(crate) fn apply_compact_ssts(
999 &mut self,
1000 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1001 member_table_ids: &BTreeSet<TableId>,
1002 ) {
1003 let IntraLevelDeltaCommon {
1004 level_idx,
1005 l0_sub_level_id,
1006 inserted_table_infos: insert_table_infos,
1007 vnode_partition_count,
1008 removed_table_ids: delete_sst_ids_set,
1009 compaction_group_version_id,
1010 } = level_delta;
1011 let new_vnode_partition_count = *vnode_partition_count;
1012
1013 if is_compaction_task_expired(
1014 self.compaction_group_version_id,
1015 *compaction_group_version_id,
1016 ) {
1017 warn!(
1018 current_compaction_group_version_id = self.compaction_group_version_id,
1019 delta_compaction_group_version_id = compaction_group_version_id,
1020 level_idx,
1021 l0_sub_level_id,
1022 insert_table_infos = ?insert_table_infos
1023 .iter()
1024 .map(|sst| (sst.sst_id, sst.object_id))
1025 .collect_vec(),
1026 ?delete_sst_ids_set,
1027 "This VersionDelta may be committed by an expired compact task. Please check it."
1028 );
1029 return;
1030 }
1031 if !delete_sst_ids_set.is_empty() {
1032 if *level_idx == 0 {
1033 for level in &mut self.l0.sub_levels {
1034 level_delete_ssts(level, delete_sst_ids_set);
1035 }
1036 } else {
1037 let idx = *level_idx as usize - 1;
1038 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1039 }
1040 }
1041
1042 if !insert_table_infos.is_empty() {
1043 let insert_sst_level_id = *level_idx;
1044 let insert_sub_level_id = *l0_sub_level_id;
1045 if insert_sst_level_id == 0 {
1046 let l0 = &mut self.l0;
1047 let index = l0
1048 .sub_levels
1049 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1050 assert!(
1051 index < l0.sub_levels.len()
1052 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1053 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1054 insert_sub_level_id,
1055 delete_sst_ids_set,
1056 l0.sub_levels
1057 .iter()
1058 .map(|level| level.sub_level_id)
1059 .collect_vec()
1060 );
1061 if l0.sub_levels[index].table_infos.is_empty()
1062 && member_table_ids.len() == 1
1063 && insert_table_infos.iter().all(|sst| {
1064 sst.table_ids.len() == 1
1065 && sst.table_ids[0]
1066 == member_table_ids.iter().next().expect("non-empty").table_id
1067 })
1068 {
1069 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1072 }
1073 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1074 } else {
1075 let idx = insert_sst_level_id as usize - 1;
1076 if self.levels[idx].table_infos.is_empty()
1077 && insert_table_infos
1078 .iter()
1079 .all(|sst| sst.table_ids.len() == 1)
1080 {
1081 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1082 } else if self.levels[idx].vnode_partition_count != 0
1083 && new_vnode_partition_count == 0
1084 && member_table_ids.len() > 1
1085 {
1086 self.levels[idx].vnode_partition_count = 0;
1087 }
1088 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1089 }
1090 }
1091 }
1092
1093 pub(crate) fn post_apply_l0_compact(&mut self) {
1094 {
1095 self.l0
1096 .sub_levels
1097 .retain(|level| !level.table_infos.is_empty());
1098 self.l0.total_file_size = self
1099 .l0
1100 .sub_levels
1101 .iter()
1102 .map(|level| level.total_file_size)
1103 .sum::<u64>();
1104 self.l0.uncompressed_file_size = self
1105 .l0
1106 .sub_levels
1107 .iter()
1108 .map(|level| level.uncompressed_file_size)
1109 .sum::<u64>();
1110 }
1111 }
1112}
1113
1114impl<T, L> HummockVersionCommon<T, L> {
1115 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1116 self.levels
1117 .values()
1118 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1119 }
1120}
1121
1122pub fn build_initial_compaction_group_levels(
1123 group_id: CompactionGroupId,
1124 compaction_config: &CompactionConfig,
1125) -> Levels {
1126 let mut levels = vec![];
1127 for l in 0..compaction_config.get_max_level() {
1128 levels.push(Level {
1129 level_idx: (l + 1) as u32,
1130 level_type: PbLevelType::Nonoverlapping,
1131 table_infos: vec![],
1132 total_file_size: 0,
1133 sub_level_id: 0,
1134 uncompressed_file_size: 0,
1135 vnode_partition_count: 0,
1136 });
1137 }
1138 #[expect(deprecated)] Levels {
1140 levels,
1141 l0: OverlappingLevel {
1142 sub_levels: vec![],
1143 total_file_size: 0,
1144 uncompressed_file_size: 0,
1145 },
1146 group_id,
1147 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1148 member_table_ids: vec![],
1149 compaction_group_version_id: 0,
1150 }
1151}
1152
1153fn split_sst_info_for_level(
1154 member_table_ids: &BTreeSet<u32>,
1155 level: &mut Level,
1156 new_sst_id: &mut u64,
1157) -> Vec<SstableInfo> {
1158 let mut insert_table_infos = vec![];
1161 for sst_info in &mut level.table_infos {
1162 let removed_table_ids = sst_info
1163 .table_ids
1164 .iter()
1165 .filter(|table_id| member_table_ids.contains(table_id))
1166 .cloned()
1167 .collect_vec();
1168 let sst_size = sst_info.sst_size;
1169 if sst_size / 2 == 0 {
1170 tracing::warn!(
1171 id = sst_info.sst_id,
1172 object_id = sst_info.object_id,
1173 sst_size = sst_info.sst_size,
1174 file_size = sst_info.file_size,
1175 "Sstable sst_size is under expected",
1176 );
1177 };
1178 if !removed_table_ids.is_empty() {
1179 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1180 sst_info,
1181 new_sst_id,
1182 sst_size / 2,
1183 sst_size / 2,
1184 member_table_ids.iter().cloned().collect_vec(),
1185 );
1186 *sst_info = modified_sst;
1187 insert_table_infos.push(branch_sst);
1188 }
1189 }
1190 insert_table_infos
1191}
1192
1193pub fn get_compaction_group_ids(
1195 version: &HummockVersion,
1196) -> impl Iterator<Item = CompactionGroupId> + '_ {
1197 version.levels.keys().cloned()
1198}
1199
1200pub fn get_table_compaction_group_id_mapping(
1201 version: &HummockVersion,
1202) -> HashMap<StateTableId, CompactionGroupId> {
1203 version
1204 .state_table_info
1205 .info()
1206 .iter()
1207 .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1208 .collect()
1209}
1210
1211pub fn get_compaction_group_ssts(
1213 version: &HummockVersion,
1214 group_id: CompactionGroupId,
1215) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1216 let group_levels = version.get_compaction_group_levels(group_id);
1217 group_levels
1218 .l0
1219 .sub_levels
1220 .iter()
1221 .rev()
1222 .chain(group_levels.levels.iter())
1223 .flat_map(|level| {
1224 level
1225 .table_infos
1226 .iter()
1227 .map(|table_info| (table_info.object_id, table_info.sst_id))
1228 })
1229}
1230
1231pub fn new_sub_level(
1232 sub_level_id: u64,
1233 level_type: PbLevelType,
1234 table_infos: Vec<SstableInfo>,
1235) -> Level {
1236 if level_type == PbLevelType::Nonoverlapping {
1237 debug_assert!(
1238 can_concat(&table_infos),
1239 "sst of non-overlapping level is not concat-able: {:?}",
1240 table_infos
1241 );
1242 }
1243 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1244 let uncompressed_file_size = table_infos
1245 .iter()
1246 .map(|table| table.uncompressed_file_size)
1247 .sum();
1248 Level {
1249 level_idx: 0,
1250 level_type,
1251 table_infos,
1252 total_file_size,
1253 sub_level_id,
1254 uncompressed_file_size,
1255 vnode_partition_count: 0,
1256 }
1257}
1258
1259pub fn add_ssts_to_sub_level(
1260 l0: &mut OverlappingLevel,
1261 sub_level_idx: usize,
1262 insert_table_infos: Vec<SstableInfo>,
1263) {
1264 insert_table_infos.iter().for_each(|sst| {
1265 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1266 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1267 l0.total_file_size += sst.sst_size;
1268 l0.uncompressed_file_size += sst.uncompressed_file_size;
1269 });
1270 l0.sub_levels[sub_level_idx]
1271 .table_infos
1272 .extend(insert_table_infos);
1273 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1274 l0.sub_levels[sub_level_idx]
1275 .table_infos
1276 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1277 assert!(
1278 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1279 "sstable ids: {:?}",
1280 l0.sub_levels[sub_level_idx]
1281 .table_infos
1282 .iter()
1283 .map(|sst| sst.sst_id)
1284 .collect_vec()
1285 );
1286 }
1287}
1288
1289pub fn insert_new_sub_level(
1291 l0: &mut OverlappingLevel,
1292 insert_sub_level_id: u64,
1293 level_type: PbLevelType,
1294 insert_table_infos: Vec<SstableInfo>,
1295 sub_level_insert_hint: Option<usize>,
1296) {
1297 if insert_sub_level_id == u64::MAX {
1298 return;
1299 }
1300 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1301 insert_pos
1302 } else {
1303 if let Some(newest_level) = l0.sub_levels.last() {
1304 assert!(
1305 newest_level.sub_level_id < insert_sub_level_id,
1306 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1307 newest_level.sub_level_id,
1308 insert_sub_level_id,
1309 l0,
1310 );
1311 }
1312 l0.sub_levels.len()
1313 };
1314 #[cfg(debug_assertions)]
1315 {
1316 if insert_pos > 0 {
1317 if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1318 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1319 }
1320 }
1321 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1322 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1323 }
1324 }
1325 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1328 l0.total_file_size += level.total_file_size;
1329 l0.uncompressed_file_size += level.uncompressed_file_size;
1330 l0.sub_levels.insert(insert_pos, level);
1331}
1332
1333fn level_delete_ssts(
1337 operand: &mut Level,
1338 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1339) -> bool {
1340 let original_len = operand.table_infos.len();
1341 operand
1342 .table_infos
1343 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1344 operand.total_file_size = operand
1345 .table_infos
1346 .iter()
1347 .map(|table| table.sst_size)
1348 .sum::<u64>();
1349 operand.uncompressed_file_size = operand
1350 .table_infos
1351 .iter()
1352 .map(|table| table.uncompressed_file_size)
1353 .sum::<u64>();
1354 original_len != operand.table_infos.len()
1355}
1356
1357fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1358 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1359 format!(
1360 "sstable ids: {:?}",
1361 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1362 )
1363 }
1364 operand.total_file_size += insert_table_infos
1365 .iter()
1366 .map(|sst| sst.sst_size)
1367 .sum::<u64>();
1368 operand.uncompressed_file_size += insert_table_infos
1369 .iter()
1370 .map(|sst| sst.uncompressed_file_size)
1371 .sum::<u64>();
1372 if operand.level_type == PbLevelType::Overlapping {
1373 operand.level_type = PbLevelType::Nonoverlapping;
1374 operand
1375 .table_infos
1376 .extend(insert_table_infos.iter().cloned());
1377 operand
1378 .table_infos
1379 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1380 assert!(
1381 can_concat(&operand.table_infos),
1382 "{}",
1383 display_sstable_infos(&operand.table_infos)
1384 );
1385 } else if !insert_table_infos.is_empty() {
1386 let sorted_insert: Vec<_> = insert_table_infos
1387 .iter()
1388 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1389 .cloned()
1390 .collect();
1391 let first = &sorted_insert[0];
1392 let last = &sorted_insert[sorted_insert.len() - 1];
1393 let pos = operand
1394 .table_infos
1395 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1396 if pos >= operand.table_infos.len()
1397 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1398 {
1399 operand.table_infos.splice(pos..pos, sorted_insert);
1400 let validate_range = operand
1402 .table_infos
1403 .iter()
1404 .skip(pos.saturating_sub(1))
1405 .take(insert_table_infos.len() + 2)
1406 .collect_vec();
1407 assert!(
1408 can_concat(&validate_range),
1409 "{}",
1410 display_sstable_infos(&validate_range),
1411 );
1412 } else {
1413 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1416 for i in insert_table_infos {
1417 let pos = operand
1418 .table_infos
1419 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1420 operand.table_infos.insert(pos, i.clone());
1421 }
1422 assert!(
1423 can_concat(&operand.table_infos),
1424 "{}",
1425 display_sstable_infos(&operand.table_infos)
1426 );
1427 }
1428 }
1429}
1430
1431pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObjectId, u64> {
1432 version
1433 .levels
1434 .values()
1435 .flat_map(|cg| {
1436 cg.level0()
1437 .sub_levels
1438 .iter()
1439 .chain(cg.levels.iter())
1440 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1441 })
1442 .chain(version.table_change_log.values().flat_map(|c| {
1443 c.iter().flat_map(|l| {
1444 l.old_value
1445 .iter()
1446 .chain(l.new_value.iter())
1447 .map(|t| (t.object_id, t.file_size))
1448 })
1449 }))
1450 .collect()
1451}
1452
1453pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1456 let mut res = Vec::new();
1457 for (group_id, levels) in &version.levels {
1459 if levels.group_id != *group_id {
1461 res.push(format!(
1462 "GROUP {}: inconsistent group id {} in Levels",
1463 group_id, levels.group_id
1464 ));
1465 }
1466
1467 let validate_level = |group: CompactionGroupId,
1468 expected_level_idx: u32,
1469 level: &Level,
1470 res: &mut Vec<String>| {
1471 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1472 if level.level_idx == 0 {
1473 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1474 if level.table_infos.is_empty() {
1476 res.push(format!("{}: empty level", level_identifier));
1477 }
1478 } else if level.level_type != PbLevelType::Nonoverlapping {
1479 res.push(format!(
1481 "{}: level type {:?} is not non-overlapping",
1482 level_identifier, level.level_type
1483 ));
1484 }
1485
1486 if level.level_idx != expected_level_idx {
1488 res.push(format!(
1489 "{}: mismatched level idx {}",
1490 level_identifier, expected_level_idx
1491 ));
1492 }
1493
1494 let mut prev_table_info: Option<&SstableInfo> = None;
1495 for table_info in &level.table_infos {
1496 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1498 res.push(format!(
1499 "{} SST {}: table_ids not sorted",
1500 level_identifier, table_info.object_id
1501 ));
1502 }
1503
1504 if level.level_type == PbLevelType::Nonoverlapping {
1506 if let Some(prev) = prev_table_info.take() {
1507 if prev
1508 .key_range
1509 .compare_right_with(&table_info.key_range.left)
1510 != Ordering::Less
1511 {
1512 res.push(format!(
1513 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1514 level_identifier, table_info.object_id, prev, table_info
1515 ));
1516 }
1517 }
1518 let _ = prev_table_info.insert(table_info);
1519 }
1520 }
1521 };
1522
1523 let l0 = &levels.l0;
1524 let mut prev_sub_level_id = u64::MAX;
1525 for sub_level in &l0.sub_levels {
1526 if sub_level.sub_level_id >= prev_sub_level_id {
1528 res.push(format!(
1529 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1530 group_id, sub_level.level_idx, prev_sub_level_id
1531 ));
1532 }
1533 prev_sub_level_id = sub_level.sub_level_id;
1534
1535 validate_level(*group_id, 0, sub_level, &mut res);
1536 }
1537
1538 for idx in 1..=levels.levels.len() {
1539 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1540 }
1541 }
1542 res
1543}
1544
1545#[cfg(test)]
1546mod tests {
1547 use std::collections::{HashMap, HashSet};
1548
1549 use bytes::Bytes;
1550 use risingwave_common::catalog::TableId;
1551 use risingwave_common::hash::VirtualNode;
1552 use risingwave_common::util::epoch::test_epoch;
1553 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1554
1555 use super::group_split;
1556 use crate::HummockVersionId;
1557 use crate::compaction_group::group_split::*;
1558 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1559 use crate::key::{FullKey, gen_key_from_str};
1560 use crate::key_range::KeyRange;
1561 use crate::level::{Level, Levels, OverlappingLevel};
1562 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1563 use crate::version::{
1564 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1565 };
1566
1567 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1568 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1569 }
1570
1571 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1572 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1573 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1574 let full_key_l = FullKey::for_test(
1575 TableId::new(*table_ids.first().unwrap()),
1576 table_key_l,
1577 epoch,
1578 )
1579 .encode();
1580 let full_key_r =
1581 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1582 .encode();
1583
1584 SstableInfoInner {
1585 sst_id,
1586 key_range: KeyRange {
1587 left: full_key_l.into(),
1588 right: full_key_r.into(),
1589 right_exclusive: false,
1590 },
1591 table_ids,
1592 object_id: sst_id,
1593 min_epoch: 20,
1594 max_epoch: 20,
1595 file_size: 100,
1596 sst_size: 100,
1597 ..Default::default()
1598 }
1599 }
1600
1601 #[test]
1602 fn test_get_sst_object_ids() {
1603 let mut version = HummockVersion {
1604 id: HummockVersionId::new(0),
1605 levels: HashMap::from_iter([(
1606 0,
1607 Levels {
1608 levels: vec![],
1609 l0: OverlappingLevel {
1610 sub_levels: vec![],
1611 total_file_size: 0,
1612 uncompressed_file_size: 0,
1613 },
1614 ..Default::default()
1615 },
1616 )]),
1617 ..Default::default()
1618 };
1619 assert_eq!(version.get_object_ids(false).len(), 0);
1620
1621 version
1623 .levels
1624 .get_mut(&0)
1625 .unwrap()
1626 .l0
1627 .sub_levels
1628 .push(Level {
1629 table_infos: vec![
1630 SstableInfoInner {
1631 object_id: 11,
1632 sst_id: 11,
1633 ..Default::default()
1634 }
1635 .into(),
1636 ],
1637 ..Default::default()
1638 });
1639 assert_eq!(version.get_object_ids(false).len(), 1);
1640
1641 version.levels.get_mut(&0).unwrap().levels.push(Level {
1643 table_infos: vec![
1644 SstableInfoInner {
1645 object_id: 22,
1646 sst_id: 22,
1647 ..Default::default()
1648 }
1649 .into(),
1650 ],
1651 ..Default::default()
1652 });
1653 assert_eq!(version.get_object_ids(false).len(), 2);
1654 }
1655
1656 #[test]
1657 fn test_apply_version_delta() {
1658 let mut version = HummockVersion {
1659 id: HummockVersionId::new(0),
1660 levels: HashMap::from_iter([
1661 (
1662 0,
1663 build_initial_compaction_group_levels(
1664 0,
1665 &CompactionConfig {
1666 max_level: 6,
1667 ..Default::default()
1668 },
1669 ),
1670 ),
1671 (
1672 1,
1673 build_initial_compaction_group_levels(
1674 1,
1675 &CompactionConfig {
1676 max_level: 6,
1677 ..Default::default()
1678 },
1679 ),
1680 ),
1681 ]),
1682 ..Default::default()
1683 };
1684 let version_delta = HummockVersionDelta {
1685 id: HummockVersionId::new(1),
1686 group_deltas: HashMap::from_iter([
1687 (
1688 2,
1689 GroupDeltas {
1690 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1691 group_config: Some(CompactionConfig {
1692 max_level: 6,
1693 ..Default::default()
1694 }),
1695 ..Default::default()
1696 }))],
1697 },
1698 ),
1699 (
1700 0,
1701 GroupDeltas {
1702 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1703 },
1704 ),
1705 (
1706 1,
1707 GroupDeltas {
1708 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1709 1,
1710 0,
1711 HashSet::new(),
1712 vec![
1713 SstableInfoInner {
1714 object_id: 1,
1715 sst_id: 1,
1716 ..Default::default()
1717 }
1718 .into(),
1719 ],
1720 0,
1721 version
1722 .levels
1723 .get(&1)
1724 .as_ref()
1725 .unwrap()
1726 .compaction_group_version_id,
1727 ))],
1728 },
1729 ),
1730 ]),
1731 ..Default::default()
1732 };
1733 let version_delta = version_delta;
1734
1735 version.apply_version_delta(&version_delta);
1736 let mut cg1 = build_initial_compaction_group_levels(
1737 1,
1738 &CompactionConfig {
1739 max_level: 6,
1740 ..Default::default()
1741 },
1742 );
1743 cg1.levels[0] = Level {
1744 level_idx: 1,
1745 level_type: LevelType::Nonoverlapping,
1746 table_infos: vec![
1747 SstableInfoInner {
1748 object_id: 1,
1749 sst_id: 1,
1750 ..Default::default()
1751 }
1752 .into(),
1753 ],
1754 ..Default::default()
1755 };
1756 assert_eq!(
1757 version,
1758 HummockVersion {
1759 id: HummockVersionId::new(1),
1760 levels: HashMap::from_iter([
1761 (
1762 2,
1763 build_initial_compaction_group_levels(
1764 2,
1765 &CompactionConfig {
1766 max_level: 6,
1767 ..Default::default()
1768 },
1769 ),
1770 ),
1771 (1, cg1),
1772 ]),
1773 ..Default::default()
1774 }
1775 );
1776 }
1777
1778 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1779 gen_sst_info_impl(object_id, table_ids, left, right).into()
1780 }
1781
1782 fn gen_sst_info_impl(
1783 object_id: u64,
1784 table_ids: Vec<u32>,
1785 left: Bytes,
1786 right: Bytes,
1787 ) -> SstableInfoInner {
1788 SstableInfoInner {
1789 object_id,
1790 sst_id: object_id,
1791 key_range: KeyRange {
1792 left,
1793 right,
1794 right_exclusive: false,
1795 },
1796 table_ids,
1797 file_size: 100,
1798 sst_size: 100,
1799 uncompressed_file_size: 100,
1800 ..Default::default()
1801 }
1802 }
1803
1804 #[test]
1805 fn test_merge_levels() {
1806 let mut left_levels = build_initial_compaction_group_levels(
1807 1,
1808 &CompactionConfig {
1809 max_level: 6,
1810 ..Default::default()
1811 },
1812 );
1813
1814 let mut right_levels = build_initial_compaction_group_levels(
1815 2,
1816 &CompactionConfig {
1817 max_level: 6,
1818 ..Default::default()
1819 },
1820 );
1821
1822 left_levels.levels[0] = Level {
1823 level_idx: 1,
1824 level_type: LevelType::Nonoverlapping,
1825 table_infos: vec![
1826 gen_sst_info(
1827 1,
1828 vec![3],
1829 FullKey::for_test(
1830 TableId::new(3),
1831 gen_key_from_str(VirtualNode::from_index(1), "1"),
1832 0,
1833 )
1834 .encode()
1835 .into(),
1836 FullKey::for_test(
1837 TableId::new(3),
1838 gen_key_from_str(VirtualNode::from_index(200), "1"),
1839 0,
1840 )
1841 .encode()
1842 .into(),
1843 ),
1844 gen_sst_info(
1845 10,
1846 vec![3, 4],
1847 FullKey::for_test(
1848 TableId::new(3),
1849 gen_key_from_str(VirtualNode::from_index(201), "1"),
1850 0,
1851 )
1852 .encode()
1853 .into(),
1854 FullKey::for_test(
1855 TableId::new(4),
1856 gen_key_from_str(VirtualNode::from_index(10), "1"),
1857 0,
1858 )
1859 .encode()
1860 .into(),
1861 ),
1862 gen_sst_info(
1863 11,
1864 vec![4],
1865 FullKey::for_test(
1866 TableId::new(4),
1867 gen_key_from_str(VirtualNode::from_index(11), "1"),
1868 0,
1869 )
1870 .encode()
1871 .into(),
1872 FullKey::for_test(
1873 TableId::new(4),
1874 gen_key_from_str(VirtualNode::from_index(200), "1"),
1875 0,
1876 )
1877 .encode()
1878 .into(),
1879 ),
1880 ],
1881 total_file_size: 300,
1882 ..Default::default()
1883 };
1884
1885 left_levels.l0.sub_levels.push(Level {
1886 level_idx: 0,
1887 table_infos: vec![gen_sst_info(
1888 3,
1889 vec![3],
1890 FullKey::for_test(
1891 TableId::new(3),
1892 gen_key_from_str(VirtualNode::from_index(1), "1"),
1893 0,
1894 )
1895 .encode()
1896 .into(),
1897 FullKey::for_test(
1898 TableId::new(3),
1899 gen_key_from_str(VirtualNode::from_index(200), "1"),
1900 0,
1901 )
1902 .encode()
1903 .into(),
1904 )],
1905 sub_level_id: 101,
1906 level_type: LevelType::Overlapping,
1907 total_file_size: 100,
1908 ..Default::default()
1909 });
1910
1911 left_levels.l0.sub_levels.push(Level {
1912 level_idx: 0,
1913 table_infos: vec![gen_sst_info(
1914 3,
1915 vec![3],
1916 FullKey::for_test(
1917 TableId::new(3),
1918 gen_key_from_str(VirtualNode::from_index(1), "1"),
1919 0,
1920 )
1921 .encode()
1922 .into(),
1923 FullKey::for_test(
1924 TableId::new(3),
1925 gen_key_from_str(VirtualNode::from_index(200), "1"),
1926 0,
1927 )
1928 .encode()
1929 .into(),
1930 )],
1931 sub_level_id: 103,
1932 level_type: LevelType::Overlapping,
1933 total_file_size: 100,
1934 ..Default::default()
1935 });
1936
1937 left_levels.l0.sub_levels.push(Level {
1938 level_idx: 0,
1939 table_infos: vec![gen_sst_info(
1940 3,
1941 vec![3],
1942 FullKey::for_test(
1943 TableId::new(3),
1944 gen_key_from_str(VirtualNode::from_index(1), "1"),
1945 0,
1946 )
1947 .encode()
1948 .into(),
1949 FullKey::for_test(
1950 TableId::new(3),
1951 gen_key_from_str(VirtualNode::from_index(200), "1"),
1952 0,
1953 )
1954 .encode()
1955 .into(),
1956 )],
1957 sub_level_id: 105,
1958 level_type: LevelType::Nonoverlapping,
1959 total_file_size: 100,
1960 ..Default::default()
1961 });
1962
1963 right_levels.levels[0] = Level {
1964 level_idx: 1,
1965 level_type: LevelType::Nonoverlapping,
1966 table_infos: vec![
1967 gen_sst_info(
1968 1,
1969 vec![5],
1970 FullKey::for_test(
1971 TableId::new(5),
1972 gen_key_from_str(VirtualNode::from_index(1), "1"),
1973 0,
1974 )
1975 .encode()
1976 .into(),
1977 FullKey::for_test(
1978 TableId::new(5),
1979 gen_key_from_str(VirtualNode::from_index(200), "1"),
1980 0,
1981 )
1982 .encode()
1983 .into(),
1984 ),
1985 gen_sst_info(
1986 10,
1987 vec![5, 6],
1988 FullKey::for_test(
1989 TableId::new(5),
1990 gen_key_from_str(VirtualNode::from_index(201), "1"),
1991 0,
1992 )
1993 .encode()
1994 .into(),
1995 FullKey::for_test(
1996 TableId::new(6),
1997 gen_key_from_str(VirtualNode::from_index(10), "1"),
1998 0,
1999 )
2000 .encode()
2001 .into(),
2002 ),
2003 gen_sst_info(
2004 11,
2005 vec![6],
2006 FullKey::for_test(
2007 TableId::new(6),
2008 gen_key_from_str(VirtualNode::from_index(11), "1"),
2009 0,
2010 )
2011 .encode()
2012 .into(),
2013 FullKey::for_test(
2014 TableId::new(6),
2015 gen_key_from_str(VirtualNode::from_index(200), "1"),
2016 0,
2017 )
2018 .encode()
2019 .into(),
2020 ),
2021 ],
2022 total_file_size: 300,
2023 ..Default::default()
2024 };
2025
2026 right_levels.l0.sub_levels.push(Level {
2027 level_idx: 0,
2028 table_infos: vec![gen_sst_info(
2029 3,
2030 vec![5],
2031 FullKey::for_test(
2032 TableId::new(5),
2033 gen_key_from_str(VirtualNode::from_index(1), "1"),
2034 0,
2035 )
2036 .encode()
2037 .into(),
2038 FullKey::for_test(
2039 TableId::new(5),
2040 gen_key_from_str(VirtualNode::from_index(200), "1"),
2041 0,
2042 )
2043 .encode()
2044 .into(),
2045 )],
2046 sub_level_id: 101,
2047 level_type: LevelType::Overlapping,
2048 total_file_size: 100,
2049 ..Default::default()
2050 });
2051
2052 right_levels.l0.sub_levels.push(Level {
2053 level_idx: 0,
2054 table_infos: vec![gen_sst_info(
2055 5,
2056 vec![5],
2057 FullKey::for_test(
2058 TableId::new(5),
2059 gen_key_from_str(VirtualNode::from_index(1), "1"),
2060 0,
2061 )
2062 .encode()
2063 .into(),
2064 FullKey::for_test(
2065 TableId::new(5),
2066 gen_key_from_str(VirtualNode::from_index(200), "1"),
2067 0,
2068 )
2069 .encode()
2070 .into(),
2071 )],
2072 sub_level_id: 102,
2073 level_type: LevelType::Overlapping,
2074 total_file_size: 100,
2075 ..Default::default()
2076 });
2077
2078 right_levels.l0.sub_levels.push(Level {
2079 level_idx: 0,
2080 table_infos: vec![gen_sst_info(
2081 3,
2082 vec![5],
2083 FullKey::for_test(
2084 TableId::new(5),
2085 gen_key_from_str(VirtualNode::from_index(1), "1"),
2086 0,
2087 )
2088 .encode()
2089 .into(),
2090 FullKey::for_test(
2091 TableId::new(5),
2092 gen_key_from_str(VirtualNode::from_index(200), "1"),
2093 0,
2094 )
2095 .encode()
2096 .into(),
2097 )],
2098 sub_level_id: 103,
2099 level_type: LevelType::Nonoverlapping,
2100 total_file_size: 100,
2101 ..Default::default()
2102 });
2103
2104 {
2105 let mut left_levels = Levels::default();
2107 let right_levels = Levels::default();
2108
2109 group_split::merge_levels(&mut left_levels, right_levels);
2110 }
2111
2112 {
2113 let mut left_levels = build_initial_compaction_group_levels(
2115 1,
2116 &CompactionConfig {
2117 max_level: 6,
2118 ..Default::default()
2119 },
2120 );
2121 let right_levels = right_levels.clone();
2122
2123 group_split::merge_levels(&mut left_levels, right_levels);
2124
2125 assert!(left_levels.l0.sub_levels.len() == 3);
2126 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2127 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2128 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2129 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2130 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2131 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2132
2133 assert!(left_levels.levels[0].level_idx == 1);
2134 assert_eq!(300, left_levels.levels[0].total_file_size);
2135 }
2136
2137 {
2138 let mut left_levels = left_levels.clone();
2140 let right_levels = build_initial_compaction_group_levels(
2141 2,
2142 &CompactionConfig {
2143 max_level: 6,
2144 ..Default::default()
2145 },
2146 );
2147
2148 group_split::merge_levels(&mut left_levels, right_levels);
2149
2150 assert!(left_levels.l0.sub_levels.len() == 3);
2151 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2152 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2153 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2154 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2155 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2156 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2157
2158 assert!(left_levels.levels[0].level_idx == 1);
2159 assert_eq!(300, left_levels.levels[0].total_file_size);
2160 }
2161
2162 {
2163 let mut left_levels = left_levels.clone();
2164 let right_levels = right_levels.clone();
2165
2166 group_split::merge_levels(&mut left_levels, right_levels);
2167
2168 assert!(left_levels.l0.sub_levels.len() == 6);
2169 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2170 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2171 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2172 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2173 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2174 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2175 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2176 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2177 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2178 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2179 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2180 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2181
2182 assert!(left_levels.levels[0].level_idx == 1);
2183 assert_eq!(600, left_levels.levels[0].total_file_size);
2184 }
2185 }
2186
2187 #[test]
2188 fn test_get_split_pos() {
2189 let epoch = test_epoch(1);
2190 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2191 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2192 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2193
2194 let ssts = vec![s1, s2, s3];
2195 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2196
2197 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2198 assert_eq!(1, pos);
2199
2200 let pos = group_split::get_split_pos(&vec![], split_key);
2201 assert_eq!(0, pos);
2202 }
2203
2204 #[test]
2205 fn test_split_sst() {
2206 let epoch = test_epoch(1);
2207 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2208
2209 {
2210 let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2211 let origin_sst = sst.clone();
2212 let sst_size = origin_sst.sst_size;
2213 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2214 assert_eq!(SstSplitType::Both, split_type);
2215
2216 let mut new_sst_id = 10;
2217 let (origin_sst, branched_sst) = group_split::split_sst(
2218 origin_sst,
2219 &mut new_sst_id,
2220 split_key,
2221 sst_size / 2,
2222 sst_size / 2,
2223 );
2224
2225 let origin_sst = origin_sst.unwrap();
2226 let branched_sst = branched_sst.unwrap();
2227
2228 assert!(origin_sst.key_range.right_exclusive);
2229 assert!(
2230 origin_sst
2231 .key_range
2232 .right
2233 .cmp(&branched_sst.key_range.left)
2234 .is_le()
2235 );
2236 assert!(origin_sst.table_ids.is_sorted());
2237 assert!(branched_sst.table_ids.is_sorted());
2238 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2239 assert!(branched_sst.sst_size < origin_sst.file_size);
2240 assert_eq!(10, branched_sst.sst_id);
2241 assert_eq!(11, origin_sst.sst_id);
2242 assert_eq!(&3, branched_sst.table_ids.first().unwrap()); }
2244
2245 {
2246 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2248 let origin_sst = sst.clone();
2249 let sst_size = origin_sst.sst_size;
2250 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2251 assert_eq!(SstSplitType::Both, split_type);
2252
2253 let mut new_sst_id = 10;
2254 let (origin_sst, branched_sst) = group_split::split_sst(
2255 origin_sst,
2256 &mut new_sst_id,
2257 split_key,
2258 sst_size / 2,
2259 sst_size / 2,
2260 );
2261
2262 let origin_sst = origin_sst.unwrap();
2263 let branched_sst = branched_sst.unwrap();
2264
2265 assert!(origin_sst.key_range.right_exclusive);
2266 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2267 assert!(origin_sst.table_ids.is_sorted());
2268 assert!(branched_sst.table_ids.is_sorted());
2269 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2270 assert!(branched_sst.sst_size < origin_sst.file_size);
2271 assert_eq!(10, branched_sst.sst_id);
2272 assert_eq!(11, origin_sst.sst_id);
2273 assert_eq!(&5, branched_sst.table_ids.first().unwrap()); }
2275
2276 {
2277 let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2278 let origin_sst = sst.clone();
2279 let split_type = group_split::need_to_split(&origin_sst, split_key);
2280 assert_eq!(SstSplitType::Left, split_type);
2281 }
2282
2283 {
2284 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2285 let origin_sst = sst.clone();
2286 let split_type = group_split::need_to_split(&origin_sst, split_key);
2287 assert_eq!(SstSplitType::Both, split_type);
2288
2289 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2290 let origin_sst = sst.clone();
2291 let split_type = group_split::need_to_split(&origin_sst, split_key);
2292 assert_eq!(SstSplitType::Right, split_type);
2293 }
2294
2295 {
2296 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2298 sst.key_range.right = sst.key_range.left.clone();
2299 let sst: SstableInfo = sst.into();
2300 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2301 let origin_sst = sst.clone();
2302 let sst_size = origin_sst.sst_size;
2303
2304 let mut new_sst_id = 10;
2305 let (origin_sst, branched_sst) = group_split::split_sst(
2306 origin_sst,
2307 &mut new_sst_id,
2308 split_key,
2309 sst_size / 2,
2310 sst_size / 2,
2311 );
2312
2313 assert!(origin_sst.is_none());
2314 assert!(branched_sst.is_some());
2315 }
2316 }
2317
2318 #[test]
2319 fn test_split_sst_info_for_level() {
2320 let mut version = HummockVersion {
2321 id: HummockVersionId(0),
2322 levels: HashMap::from_iter([(
2323 1,
2324 build_initial_compaction_group_levels(
2325 1,
2326 &CompactionConfig {
2327 max_level: 6,
2328 ..Default::default()
2329 },
2330 ),
2331 )]),
2332 ..Default::default()
2333 };
2334
2335 let cg1 = version.levels.get_mut(&1).unwrap();
2336
2337 cg1.levels[0] = Level {
2338 level_idx: 1,
2339 level_type: LevelType::Nonoverlapping,
2340 table_infos: vec![
2341 gen_sst_info(
2342 1,
2343 vec![3],
2344 FullKey::for_test(
2345 TableId::new(3),
2346 gen_key_from_str(VirtualNode::from_index(1), "1"),
2347 0,
2348 )
2349 .encode()
2350 .into(),
2351 FullKey::for_test(
2352 TableId::new(3),
2353 gen_key_from_str(VirtualNode::from_index(200), "1"),
2354 0,
2355 )
2356 .encode()
2357 .into(),
2358 ),
2359 gen_sst_info(
2360 10,
2361 vec![3, 4],
2362 FullKey::for_test(
2363 TableId::new(3),
2364 gen_key_from_str(VirtualNode::from_index(201), "1"),
2365 0,
2366 )
2367 .encode()
2368 .into(),
2369 FullKey::for_test(
2370 TableId::new(4),
2371 gen_key_from_str(VirtualNode::from_index(10), "1"),
2372 0,
2373 )
2374 .encode()
2375 .into(),
2376 ),
2377 gen_sst_info(
2378 11,
2379 vec![4],
2380 FullKey::for_test(
2381 TableId::new(4),
2382 gen_key_from_str(VirtualNode::from_index(11), "1"),
2383 0,
2384 )
2385 .encode()
2386 .into(),
2387 FullKey::for_test(
2388 TableId::new(4),
2389 gen_key_from_str(VirtualNode::from_index(200), "1"),
2390 0,
2391 )
2392 .encode()
2393 .into(),
2394 ),
2395 ],
2396 total_file_size: 300,
2397 ..Default::default()
2398 };
2399
2400 cg1.l0.sub_levels.push(Level {
2401 level_idx: 0,
2402 table_infos: vec![
2403 gen_sst_info(
2404 2,
2405 vec![2],
2406 FullKey::for_test(
2407 TableId::new(0),
2408 gen_key_from_str(VirtualNode::from_index(1), "1"),
2409 0,
2410 )
2411 .encode()
2412 .into(),
2413 FullKey::for_test(
2414 TableId::new(2),
2415 gen_key_from_str(VirtualNode::from_index(200), "1"),
2416 0,
2417 )
2418 .encode()
2419 .into(),
2420 ),
2421 gen_sst_info(
2422 22,
2423 vec![2],
2424 FullKey::for_test(
2425 TableId::new(0),
2426 gen_key_from_str(VirtualNode::from_index(1), "1"),
2427 0,
2428 )
2429 .encode()
2430 .into(),
2431 FullKey::for_test(
2432 TableId::new(2),
2433 gen_key_from_str(VirtualNode::from_index(200), "1"),
2434 0,
2435 )
2436 .encode()
2437 .into(),
2438 ),
2439 gen_sst_info(
2440 23,
2441 vec![2],
2442 FullKey::for_test(
2443 TableId::new(0),
2444 gen_key_from_str(VirtualNode::from_index(1), "1"),
2445 0,
2446 )
2447 .encode()
2448 .into(),
2449 FullKey::for_test(
2450 TableId::new(2),
2451 gen_key_from_str(VirtualNode::from_index(200), "1"),
2452 0,
2453 )
2454 .encode()
2455 .into(),
2456 ),
2457 gen_sst_info(
2458 24,
2459 vec![2],
2460 FullKey::for_test(
2461 TableId::new(2),
2462 gen_key_from_str(VirtualNode::from_index(1), "1"),
2463 0,
2464 )
2465 .encode()
2466 .into(),
2467 FullKey::for_test(
2468 TableId::new(2),
2469 gen_key_from_str(VirtualNode::from_index(200), "1"),
2470 0,
2471 )
2472 .encode()
2473 .into(),
2474 ),
2475 gen_sst_info(
2476 25,
2477 vec![2],
2478 FullKey::for_test(
2479 TableId::new(0),
2480 gen_key_from_str(VirtualNode::from_index(1), "1"),
2481 0,
2482 )
2483 .encode()
2484 .into(),
2485 FullKey::for_test(
2486 TableId::new(0),
2487 gen_key_from_str(VirtualNode::from_index(200), "1"),
2488 0,
2489 )
2490 .encode()
2491 .into(),
2492 ),
2493 ],
2494 sub_level_id: 101,
2495 level_type: LevelType::Overlapping,
2496 total_file_size: 300,
2497 ..Default::default()
2498 });
2499
2500 {
2501 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2503
2504 let mut new_sst_id = 100;
2505 let x = group_split::split_sst_info_for_level_v2(
2506 &mut cg1.l0.sub_levels[0],
2507 &mut new_sst_id,
2508 split_key,
2509 );
2510 let mut right_l0 = OverlappingLevel {
2519 sub_levels: vec![],
2520 total_file_size: 0,
2521 uncompressed_file_size: 0,
2522 };
2523
2524 right_l0.sub_levels.push(Level {
2525 level_idx: 0,
2526 table_infos: x,
2527 sub_level_id: 101,
2528 total_file_size: 100,
2529 level_type: LevelType::Overlapping,
2530 ..Default::default()
2531 });
2532
2533 let right_levels = Levels {
2534 levels: vec![],
2535 l0: right_l0,
2536 ..Default::default()
2537 };
2538
2539 merge_levels(cg1, right_levels);
2540 }
2541
2542 {
2543 let mut new_sst_id = 100;
2545 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2546 let x = group_split::split_sst_info_for_level_v2(
2547 &mut cg1.levels[2],
2548 &mut new_sst_id,
2549 split_key,
2550 );
2551
2552 assert!(x.is_empty());
2553 }
2554
2555 {
2556 let mut cg1 = cg1.clone();
2558 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2559
2560 let mut new_sst_id = 100;
2561 let x = group_split::split_sst_info_for_level_v2(
2562 &mut cg1.levels[0],
2563 &mut new_sst_id,
2564 split_key,
2565 );
2566
2567 assert_eq!(3, x.len());
2568 assert_eq!(1, x[0].sst_id);
2569 assert_eq!(100, x[0].sst_size);
2570 assert_eq!(10, x[1].sst_id);
2571 assert_eq!(100, x[1].sst_size);
2572 assert_eq!(11, x[2].sst_id);
2573 assert_eq!(100, x[2].sst_size);
2574
2575 assert_eq!(0, cg1.levels[0].table_infos.len());
2576 }
2577
2578 {
2579 let mut cg1 = cg1.clone();
2581 let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2582
2583 let mut new_sst_id = 100;
2584 let x = group_split::split_sst_info_for_level_v2(
2585 &mut cg1.levels[0],
2586 &mut new_sst_id,
2587 split_key,
2588 );
2589
2590 assert_eq!(0, x.len());
2591 assert_eq!(3, cg1.levels[0].table_infos.len());
2592 }
2593
2594 {
2620 let mut cg1 = cg1.clone();
2622 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2623
2624 let mut new_sst_id = 100;
2625 let x = group_split::split_sst_info_for_level_v2(
2626 &mut cg1.levels[0],
2627 &mut new_sst_id,
2628 split_key,
2629 );
2630
2631 assert_eq!(2, x.len());
2632 assert_eq!(100, x[0].sst_id);
2633 assert_eq!(100 / 2, x[0].sst_size);
2634 assert_eq!(11, x[1].sst_id);
2635 assert_eq!(100, x[1].sst_size);
2636 assert_eq!(vec![4], x[1].table_ids);
2637
2638 assert_eq!(2, cg1.levels[0].table_infos.len());
2639 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2640 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2641 assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2642 }
2643 }
2644}