1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51 pub insert_sst_level: u32,
52 pub insert_sst_infos: Vec<SstableInfo>,
53 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60 self.levels
61 .get(&compaction_group_id)
62 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63 }
64
65 pub fn get_compaction_group_levels_mut(
66 &mut self,
67 compaction_group_id: CompactionGroupId,
68 ) -> &mut Levels {
69 self.levels
70 .get_mut(&compaction_group_id)
71 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72 }
73
74 pub fn get_sst_ids_by_group_id(
76 &self,
77 compaction_group_id: CompactionGroupId,
78 ) -> impl Iterator<Item = HummockSstableId> + '_ {
79 self.levels
80 .iter()
81 .filter_map(move |(cg_id, level)| {
82 if *cg_id == compaction_group_id {
83 Some(level)
84 } else {
85 None
86 }
87 })
88 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89 .flat_map(|level| level.table_infos.iter())
90 .map(|s| s.sst_id)
91 }
92
93 pub fn level_iter<F: FnMut(&Level) -> bool>(
94 &self,
95 compaction_group_id: CompactionGroupId,
96 mut f: F,
97 ) {
98 if let Some(levels) = self.levels.get(&compaction_group_id) {
99 for sub_level in &levels.l0.sub_levels {
100 if !f(sub_level) {
101 return;
102 }
103 }
104 for level in &levels.levels {
105 if !f(level) {
106 return;
107 }
108 }
109 }
110 }
111
112 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113 self.levels
115 .get(&compaction_group_id)
116 .map(|group| group.levels.len() + 1)
117 .unwrap_or(0)
118 }
119
120 pub fn safe_epoch_table_watermarks(
121 &self,
122 existing_table_ids: &[u32],
123 ) -> BTreeMap<u32, TableWatermarks> {
124 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125 }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130 existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132 fn extract_single_table_watermark(
133 table_watermarks: &TableWatermarks,
134 ) -> Option<TableWatermarks> {
135 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136 Some(TableWatermarks {
137 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138 direction: table_watermarks.direction,
139 watermark_type: table_watermarks.watermark_type,
140 })
141 } else {
142 None
143 }
144 }
145 table_watermarks
146 .iter()
147 .filter_map(|(table_id, table_watermarks)| {
148 let u32_table_id = table_id.table_id();
149 if !existing_table_ids.contains(&u32_table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (table_id.table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 TableId::from(table_id),
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level
296 .table_infos
297 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298 .for_each(|sst_info| {
299 sub_level.total_file_size -= sst_info.sst_size;
300 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301 l0.total_file_size -= sst_info.sst_size;
302 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303 });
304 if insert_table_infos.is_empty() {
305 continue;
306 }
307 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308 Ok(idx) => {
309 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310 }
311 Err(idx) => {
312 insert_new_sub_level(
313 target_l0,
314 sub_level.sub_level_id,
315 sub_level.level_type,
316 insert_table_infos,
317 Some(idx),
318 );
319 }
320 }
321 }
322
323 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324 }
325 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326 let insert_table_infos =
327 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328 cur_levels.levels[idx].total_file_size += insert_table_infos
329 .iter()
330 .map(|sst| sst.sst_size)
331 .sum::<u64>();
332 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333 .iter()
334 .map(|sst| sst.uncompressed_file_size)
335 .sum::<u64>();
336 cur_levels.levels[idx]
337 .table_infos
338 .extend(insert_table_infos);
339 cur_levels.levels[idx]
340 .table_infos
341 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342 assert!(can_concat(&cur_levels.levels[idx].table_infos));
343 level
344 .table_infos
345 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346 .for_each(|sst_info| {
347 level.total_file_size -= sst_info.sst_size;
348 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349 });
350 }
351
352 assert!(
353 parent_levels
354 .l0
355 .sub_levels
356 .iter()
357 .all(|level| !level.table_infos.is_empty())
358 );
359 assert!(
360 cur_levels
361 .l0
362 .sub_levels
363 .iter()
364 .all(|level| !level.table_infos.is_empty())
365 );
366 }
367
368 pub fn build_sst_delta_infos(
369 &self,
370 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371 ) -> Vec<SstDeltaInfo> {
372 let mut infos = vec![];
373
374 if version_delta.trivial_move {
377 return infos;
378 }
379
380 for (group_id, group_deltas) in &version_delta.group_deltas {
381 let mut info = SstDeltaInfo::default();
382
383 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386 if !group_deltas.group_deltas.iter().all(|delta| {
388 matches!(
389 delta,
390 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391 )
392 }) {
393 continue;
394 }
395
396 for group_delta in &group_deltas.group_deltas {
400 match group_delta {
401 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402 if !inserted_table_infos.is_empty() {
403 info.insert_sst_level = 0;
404 info.insert_sst_infos
405 .extend(inserted_table_infos.iter().cloned());
406 }
407 }
408 GroupDeltaCommon::IntraLevel(intra_level) => {
409 if !intra_level.inserted_table_infos.is_empty() {
410 info.insert_sst_level = intra_level.level_idx;
411 info.insert_sst_infos
412 .extend(intra_level.inserted_table_infos.iter().cloned());
413 }
414 if !intra_level.removed_table_ids.is_empty() {
415 for id in &intra_level.removed_table_ids {
416 if intra_level.level_idx == 0 {
417 removed_l0_ssts.insert(*id);
418 } else {
419 removed_ssts
420 .entry(intra_level.level_idx)
421 .or_default()
422 .insert(*id);
423 }
424 }
425 }
426 }
427 GroupDeltaCommon::GroupConstruct(_)
428 | GroupDeltaCommon::GroupDestroy(_)
429 | GroupDeltaCommon::GroupMerge(_)
430 | GroupDeltaCommon::TruncateTables(_) => {}
431 }
432 }
433
434 let group = self.levels.get(group_id).unwrap();
435 for l0_sub_level in &group.level0().sub_levels {
436 for sst_info in &l0_sub_level.table_infos {
437 if removed_l0_ssts.remove(&sst_info.sst_id) {
438 info.delete_sst_object_ids.push(sst_info.object_id);
439 }
440 }
441 }
442 for level in &group.levels {
443 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
444 for sst_info in &level.table_infos {
445 if removed_level_ssts.remove(&sst_info.sst_id) {
446 info.delete_sst_object_ids.push(sst_info.object_id);
447 }
448 }
449 if !removed_level_ssts.is_empty() {
450 tracing::error!(
451 "removed_level_ssts is not empty: {:?}",
452 removed_level_ssts,
453 );
454 }
455 debug_assert!(removed_level_ssts.is_empty());
456 }
457 }
458
459 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
460 tracing::error!(
461 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
462 removed_l0_ssts,
463 removed_ssts
464 );
465 }
466 debug_assert!(removed_l0_ssts.is_empty());
467 debug_assert!(removed_ssts.is_empty());
468
469 infos.push(info);
470 }
471
472 infos
473 }
474
475 pub fn apply_version_delta(
476 &mut self,
477 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
478 ) {
479 assert_eq!(self.id, version_delta.prev_id);
480
481 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
482 &version_delta.state_table_info_delta,
483 &version_delta.removed_table_ids,
484 );
485
486 #[expect(deprecated)]
487 {
488 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
489 is_commit_epoch = true;
490 tracing::trace!(
491 "max committed epoch bumped but no table committed epoch is changed"
492 );
493 }
494 }
495
496 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
498 let mut is_applied_l0_compact = false;
499 for group_delta in &group_deltas.group_deltas {
500 match group_delta {
501 GroupDeltaCommon::GroupConstruct(group_construct) => {
502 let mut new_levels = build_initial_compaction_group_levels(
503 *compaction_group_id,
504 group_construct.get_group_config().unwrap(),
505 );
506 let parent_group_id = group_construct.parent_group_id;
507 new_levels.parent_group_id = parent_group_id;
508 #[expect(deprecated)]
509 new_levels
511 .member_table_ids
512 .clone_from(&group_construct.table_ids);
513 self.levels.insert(*compaction_group_id, new_levels);
514 let member_table_ids = if group_construct.version()
515 >= CompatibilityVersion::NoMemberTableIds
516 {
517 self.state_table_info
518 .compaction_group_member_table_ids(*compaction_group_id)
519 .iter()
520 .map(|table_id| table_id.table_id)
521 .collect()
522 } else {
523 #[expect(deprecated)]
524 BTreeSet::from_iter(group_construct.table_ids.clone())
526 };
527
528 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
529 let split_key = if group_construct.split_key.is_some() {
530 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
531 } else {
532 None
533 };
534 self.init_with_parent_group_v2(
535 parent_group_id,
536 *compaction_group_id,
537 group_construct.get_new_sst_start_id().into(),
538 split_key.clone(),
539 );
540 } else {
541 self.init_with_parent_group(
543 parent_group_id,
544 *compaction_group_id,
545 member_table_ids,
546 group_construct.get_new_sst_start_id().into(),
547 );
548 }
549 }
550 GroupDeltaCommon::GroupMerge(group_merge) => {
551 tracing::info!(
552 "group_merge left {:?} right {:?}",
553 group_merge.left_group_id,
554 group_merge.right_group_id
555 );
556 self.merge_compaction_group(
557 group_merge.left_group_id,
558 group_merge.right_group_id,
559 )
560 }
561 GroupDeltaCommon::IntraLevel(level_delta) => {
562 let levels =
563 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
564 panic!("compaction group {} does not exist", compaction_group_id)
565 });
566 if is_commit_epoch {
567 assert!(
568 level_delta.removed_table_ids.is_empty(),
569 "no sst should be deleted when committing an epoch"
570 );
571
572 let IntraLevelDelta {
573 level_idx,
574 l0_sub_level_id,
575 inserted_table_infos,
576 ..
577 } = level_delta;
578 {
579 assert_eq!(
580 *level_idx, 0,
581 "we should only add to L0 when we commit an epoch."
582 );
583 if !inserted_table_infos.is_empty() {
584 insert_new_sub_level(
585 &mut levels.l0,
586 *l0_sub_level_id,
587 PbLevelType::Overlapping,
588 inserted_table_infos.clone(),
589 None,
590 );
591 }
592 }
593 } else {
594 levels.apply_compact_ssts(
596 level_delta,
597 self.state_table_info
598 .compaction_group_member_table_ids(*compaction_group_id),
599 );
600 if level_delta.level_idx == 0 {
601 is_applied_l0_compact = true;
602 }
603 }
604 }
605 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
606 let levels =
607 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
608 panic!("compaction group {} does not exist", compaction_group_id)
609 });
610 assert!(is_commit_epoch);
611
612 if !inserted_table_infos.is_empty() {
613 let next_l0_sub_level_id = levels
614 .l0
615 .sub_levels
616 .last()
617 .map(|level| level.sub_level_id + 1)
618 .unwrap_or(1);
619
620 insert_new_sub_level(
621 &mut levels.l0,
622 next_l0_sub_level_id,
623 PbLevelType::Overlapping,
624 inserted_table_infos.clone(),
625 None,
626 );
627 }
628 }
629 GroupDeltaCommon::GroupDestroy(_) => {
630 self.levels.remove(compaction_group_id);
631 }
632
633 GroupDeltaCommon::TruncateTables(table_ids) => {
634 let levels =
636 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
637 panic!("compaction group {} does not exist", compaction_group_id)
638 });
639
640 for sub_level in &mut levels.l0.sub_levels {
641 sub_level.table_infos.iter_mut().for_each(|sstable_info| {
642 let mut inner = sstable_info.get_inner();
643 inner.table_ids.retain(|id| !table_ids.contains(id));
644 sstable_info.set_inner(inner);
645 });
646 }
647
648 for level in &mut levels.levels {
649 level.table_infos.iter_mut().for_each(|sstable_info| {
650 let mut inner = sstable_info.get_inner();
651 inner.table_ids.retain(|id| !table_ids.contains(id));
652 sstable_info.set_inner(inner);
653 });
654 }
655
656 levels.compaction_group_version_id += 1;
657 }
658 }
659 }
660
661 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
662 {
663 levels.post_apply_l0_compact();
664 }
665 }
666 self.id = version_delta.id;
667 #[expect(deprecated)]
668 {
669 self.max_committed_epoch = version_delta.max_committed_epoch;
670 }
671
672 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
676 HashMap::new();
677
678 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
680 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
681 if version_delta.removed_table_ids.contains(table_id) {
682 modified_table_watermarks.insert(*table_id, None);
683 } else {
684 let mut current_table_watermarks = (**current_table_watermarks).clone();
685 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
686 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
687 }
688 } else {
689 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
690 }
691 }
692 for (table_id, table_watermarks) in &self.table_watermarks {
693 let safe_epoch = if let Some(state_table_info) =
694 self.state_table_info.info().get(table_id)
695 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
696 && state_table_info.committed_epoch > *oldest_epoch
697 {
698 state_table_info.committed_epoch
700 } else {
701 continue;
703 };
704 let table_watermarks = modified_table_watermarks
705 .entry(*table_id)
706 .or_insert_with(|| Some((**table_watermarks).clone()));
707 if let Some(table_watermarks) = table_watermarks {
708 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
709 }
710 }
711 for (table_id, table_watermarks) in modified_table_watermarks {
713 if let Some(table_watermarks) = table_watermarks {
714 self.table_watermarks
715 .insert(table_id, Arc::new(table_watermarks));
716 } else {
717 self.table_watermarks.remove(&table_id);
718 }
719 }
720
721 Self::apply_change_log_delta(
723 &mut self.table_change_log,
724 &version_delta.change_log_delta,
725 &version_delta.removed_table_ids,
726 &version_delta.state_table_info_delta,
727 &changed_table_info,
728 );
729
730 apply_vector_index_delta(
732 &mut self.vector_indexes,
733 &version_delta.vector_index_delta,
734 &version_delta.removed_table_ids,
735 );
736 }
737
738 pub fn apply_change_log_delta<T: Clone>(
739 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
740 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
741 removed_table_ids: &HashSet<TableId>,
742 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
743 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
744 ) {
745 for (table_id, change_log_delta) in change_log_delta {
746 let new_change_log = &change_log_delta.new_log;
747 match table_change_log.entry(*table_id) {
748 Entry::Occupied(entry) => {
749 let change_log = entry.into_mut();
750 change_log.add_change_log(new_change_log.clone());
751 }
752 Entry::Vacant(entry) => {
753 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
754 }
755 };
756 }
757
758 table_change_log.retain(|table_id, _| {
762 if removed_table_ids.contains(table_id) {
763 return false;
764 }
765 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
766 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
767 } else {
769 return true;
771 }
772 let contains = change_log_delta.contains_key(table_id);
773 if !contains {
774 warn!(
775 ?table_id,
776 "table change log dropped due to no further change log at newly committed epoch",
777 );
778 }
779 contains
780 });
781
782 for (table_id, change_log_delta) in change_log_delta {
784 if let Some(change_log) = table_change_log.get_mut(table_id) {
785 change_log.truncate(change_log_delta.truncate_epoch);
786 }
787 }
788 }
789
790 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
791 let mut ret: BTreeMap<_, _> = BTreeMap::new();
792 for (compaction_group_id, group) in &self.levels {
793 let mut levels = vec![];
794 levels.extend(group.l0.sub_levels.iter());
795 levels.extend(group.levels.iter());
796 for level in levels {
797 for table_info in &level.table_infos {
798 if table_info.sst_id.inner() == table_info.object_id.inner() {
799 continue;
800 }
801 let object_id = table_info.object_id;
802 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
803 entry
804 .entry(*compaction_group_id)
805 .or_default()
806 .push(table_info.sst_id)
807 }
808 }
809 }
810 ret
811 }
812
813 pub fn merge_compaction_group(
814 &mut self,
815 left_group_id: CompactionGroupId,
816 right_group_id: CompactionGroupId,
817 ) {
818 let left_group_id_table_ids = self
820 .state_table_info
821 .compaction_group_member_table_ids(left_group_id)
822 .iter()
823 .map(|table_id| table_id.table_id);
824 let right_group_id_table_ids = self
825 .state_table_info
826 .compaction_group_member_table_ids(right_group_id)
827 .iter()
828 .map(|table_id| table_id.table_id);
829
830 assert!(
831 left_group_id_table_ids
832 .chain(right_group_id_table_ids)
833 .is_sorted()
834 );
835
836 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
837 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
838 panic!(
839 "compaction group should exist right {} all {:?}",
840 right_group_id, total_cg
841 )
842 });
843
844 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
845 panic!(
846 "compaction group should exist left {} all {:?}",
847 left_group_id, total_cg
848 )
849 });
850
851 group_split::merge_levels(left_levels, right_levels);
852 }
853
854 pub fn init_with_parent_group_v2(
855 &mut self,
856 parent_group_id: CompactionGroupId,
857 group_id: CompactionGroupId,
858 new_sst_start_id: HummockSstableId,
859 split_key: Option<Bytes>,
860 ) {
861 let mut new_sst_id = new_sst_start_id;
862 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
863 if new_sst_start_id != 0 {
864 if cfg!(debug_assertions) {
865 panic!(
866 "non-zero sst start id {} for NewCompactionGroup",
867 new_sst_start_id
868 );
869 } else {
870 warn!(
871 %new_sst_start_id,
872 "non-zero sst start id for NewCompactionGroup"
873 );
874 }
875 }
876 return;
877 } else if !self.levels.contains_key(&parent_group_id) {
878 unreachable!(
879 "non-existing parent group id {} to init from (V2)",
880 parent_group_id
881 );
882 }
883
884 let [parent_levels, cur_levels] = self
885 .levels
886 .get_disjoint_mut([&parent_group_id, &group_id])
887 .map(|res| res.unwrap());
888 parent_levels.compaction_group_version_id += 1;
891 cur_levels.compaction_group_version_id += 1;
892
893 let l0 = &mut parent_levels.l0;
894 {
895 for sub_level in &mut l0.sub_levels {
896 let target_l0 = &mut cur_levels.l0;
897 let insert_table_infos = if let Some(split_key) = &split_key {
900 group_split::split_sst_info_for_level_v2(
901 sub_level,
902 &mut new_sst_id,
903 split_key.clone(),
904 )
905 } else {
906 vec![]
907 };
908
909 if insert_table_infos.is_empty() {
910 continue;
911 }
912
913 sub_level
914 .table_infos
915 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
916 .for_each(|sst_info| {
917 sub_level.total_file_size -= sst_info.sst_size;
918 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
919 l0.total_file_size -= sst_info.sst_size;
920 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
921 });
922 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
923 Ok(idx) => {
924 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
925 }
926 Err(idx) => {
927 insert_new_sub_level(
928 target_l0,
929 sub_level.sub_level_id,
930 sub_level.level_type,
931 insert_table_infos,
932 Some(idx),
933 );
934 }
935 }
936 }
937
938 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
939 }
940
941 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
942 let insert_table_infos = if let Some(split_key) = &split_key {
943 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
944 } else {
945 vec![]
946 };
947
948 if insert_table_infos.is_empty() {
949 continue;
950 }
951
952 cur_levels.levels[idx].total_file_size += insert_table_infos
953 .iter()
954 .map(|sst| sst.sst_size)
955 .sum::<u64>();
956 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
957 .iter()
958 .map(|sst| sst.uncompressed_file_size)
959 .sum::<u64>();
960 cur_levels.levels[idx]
961 .table_infos
962 .extend(insert_table_infos);
963 cur_levels.levels[idx]
964 .table_infos
965 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
966 assert!(can_concat(&cur_levels.levels[idx].table_infos));
967 level
968 .table_infos
969 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
970 .for_each(|sst_info| {
971 level.total_file_size -= sst_info.sst_size;
972 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
973 });
974 }
975
976 assert!(
977 parent_levels
978 .l0
979 .sub_levels
980 .iter()
981 .all(|level| !level.table_infos.is_empty())
982 );
983 assert!(
984 cur_levels
985 .l0
986 .sub_levels
987 .iter()
988 .all(|level| !level.table_infos.is_empty())
989 );
990 }
991}
992
993impl<T> HummockVersionCommon<T>
994where
995 T: SstableIdReader + ObjectIdReader,
996{
997 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
998 self.get_sst_infos(exclude_change_log)
999 .map(|s| s.object_id())
1000 .collect()
1001 }
1002
1003 pub fn get_object_ids(
1004 &self,
1005 exclude_change_log: bool,
1006 ) -> impl Iterator<Item = HummockObjectId> + '_ {
1007 match HummockObjectId::Sstable(0.into()) {
1011 HummockObjectId::Sstable(_) => {}
1012 HummockObjectId::VectorFile(_) => {}
1013 HummockObjectId::HnswGraphFile(_) => {}
1014 };
1015 self.get_sst_infos(exclude_change_log)
1016 .map(|s| HummockObjectId::Sstable(s.object_id()))
1017 .chain(
1018 self.vector_indexes
1019 .values()
1020 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1021 )
1022 }
1023
1024 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1025 self.get_sst_infos(exclude_change_log)
1026 .map(|s| s.sst_id())
1027 .collect()
1028 }
1029
1030 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1031 let may_table_change_log = if exclude_change_log {
1032 None
1033 } else {
1034 Some(self.table_change_log.values())
1035 };
1036 self.get_combined_levels()
1037 .flat_map(|level| level.table_infos.iter())
1038 .chain(
1039 may_table_change_log
1040 .map(|v| {
1041 v.flat_map(|table_change_log| {
1042 table_change_log.iter().flat_map(|epoch_change_log| {
1043 epoch_change_log
1044 .old_value
1045 .iter()
1046 .chain(epoch_change_log.new_value.iter())
1047 })
1048 })
1049 })
1050 .into_iter()
1051 .flatten(),
1052 )
1053 }
1054}
1055
1056impl Levels {
1057 pub(crate) fn apply_compact_ssts(
1058 &mut self,
1059 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1060 member_table_ids: &BTreeSet<TableId>,
1061 ) {
1062 let IntraLevelDeltaCommon {
1063 level_idx,
1064 l0_sub_level_id,
1065 inserted_table_infos: insert_table_infos,
1066 vnode_partition_count,
1067 removed_table_ids: delete_sst_ids_set,
1068 compaction_group_version_id,
1069 } = level_delta;
1070 let new_vnode_partition_count = *vnode_partition_count;
1071
1072 if is_compaction_task_expired(
1073 self.compaction_group_version_id,
1074 *compaction_group_version_id,
1075 ) {
1076 warn!(
1077 current_compaction_group_version_id = self.compaction_group_version_id,
1078 delta_compaction_group_version_id = compaction_group_version_id,
1079 level_idx,
1080 l0_sub_level_id,
1081 insert_table_infos = ?insert_table_infos
1082 .iter()
1083 .map(|sst| (sst.sst_id, sst.object_id))
1084 .collect_vec(),
1085 ?delete_sst_ids_set,
1086 "This VersionDelta may be committed by an expired compact task. Please check it."
1087 );
1088 return;
1089 }
1090 if !delete_sst_ids_set.is_empty() {
1091 if *level_idx == 0 {
1092 for level in &mut self.l0.sub_levels {
1093 level_delete_ssts(level, delete_sst_ids_set);
1094 }
1095 } else {
1096 let idx = *level_idx as usize - 1;
1097 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1098 }
1099 }
1100
1101 if !insert_table_infos.is_empty() {
1102 let insert_sst_level_id = *level_idx;
1103 let insert_sub_level_id = *l0_sub_level_id;
1104 if insert_sst_level_id == 0 {
1105 let l0 = &mut self.l0;
1106 let index = l0
1107 .sub_levels
1108 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1109 assert!(
1110 index < l0.sub_levels.len()
1111 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1112 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1113 insert_sub_level_id,
1114 delete_sst_ids_set,
1115 l0.sub_levels
1116 .iter()
1117 .map(|level| level.sub_level_id)
1118 .collect_vec()
1119 );
1120 if l0.sub_levels[index].table_infos.is_empty()
1121 && member_table_ids.len() == 1
1122 && insert_table_infos.iter().all(|sst| {
1123 sst.table_ids.len() == 1
1124 && sst.table_ids[0]
1125 == member_table_ids.iter().next().expect("non-empty").table_id
1126 })
1127 {
1128 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1131 }
1132 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1133 } else {
1134 let idx = insert_sst_level_id as usize - 1;
1135 if self.levels[idx].table_infos.is_empty()
1136 && insert_table_infos
1137 .iter()
1138 .all(|sst| sst.table_ids.len() == 1)
1139 {
1140 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1141 } else if self.levels[idx].vnode_partition_count != 0
1142 && new_vnode_partition_count == 0
1143 && member_table_ids.len() > 1
1144 {
1145 self.levels[idx].vnode_partition_count = 0;
1146 }
1147 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1148 }
1149 }
1150 }
1151
1152 pub(crate) fn post_apply_l0_compact(&mut self) {
1153 {
1154 self.l0
1155 .sub_levels
1156 .retain(|level| !level.table_infos.is_empty());
1157 self.l0.total_file_size = self
1158 .l0
1159 .sub_levels
1160 .iter()
1161 .map(|level| level.total_file_size)
1162 .sum::<u64>();
1163 self.l0.uncompressed_file_size = self
1164 .l0
1165 .sub_levels
1166 .iter()
1167 .map(|level| level.uncompressed_file_size)
1168 .sum::<u64>();
1169 }
1170 }
1171}
1172
1173impl<T, L> HummockVersionCommon<T, L> {
1174 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1175 self.levels
1176 .values()
1177 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1178 }
1179}
1180
1181pub fn build_initial_compaction_group_levels(
1182 group_id: CompactionGroupId,
1183 compaction_config: &CompactionConfig,
1184) -> Levels {
1185 let mut levels = vec![];
1186 for l in 0..compaction_config.get_max_level() {
1187 levels.push(Level {
1188 level_idx: (l + 1) as u32,
1189 level_type: PbLevelType::Nonoverlapping,
1190 table_infos: vec![],
1191 total_file_size: 0,
1192 sub_level_id: 0,
1193 uncompressed_file_size: 0,
1194 vnode_partition_count: 0,
1195 });
1196 }
1197 #[expect(deprecated)] Levels {
1199 levels,
1200 l0: OverlappingLevel {
1201 sub_levels: vec![],
1202 total_file_size: 0,
1203 uncompressed_file_size: 0,
1204 },
1205 group_id,
1206 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1207 member_table_ids: vec![],
1208 compaction_group_version_id: 0,
1209 }
1210}
1211
1212fn split_sst_info_for_level(
1213 member_table_ids: &BTreeSet<u32>,
1214 level: &mut Level,
1215 new_sst_id: &mut HummockSstableId,
1216) -> Vec<SstableInfo> {
1217 let mut insert_table_infos = vec![];
1220 for sst_info in &mut level.table_infos {
1221 let removed_table_ids = sst_info
1222 .table_ids
1223 .iter()
1224 .filter(|table_id| member_table_ids.contains(table_id))
1225 .cloned()
1226 .collect_vec();
1227 let sst_size = sst_info.sst_size;
1228 if sst_size / 2 == 0 {
1229 tracing::warn!(
1230 id = %sst_info.sst_id,
1231 object_id = %sst_info.object_id,
1232 sst_size = sst_info.sst_size,
1233 file_size = sst_info.file_size,
1234 "Sstable sst_size is under expected",
1235 );
1236 };
1237 if !removed_table_ids.is_empty() {
1238 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1239 sst_info,
1240 new_sst_id,
1241 sst_size / 2,
1242 sst_size / 2,
1243 member_table_ids.iter().cloned().collect_vec(),
1244 );
1245 *sst_info = modified_sst;
1246 insert_table_infos.push(branch_sst);
1247 }
1248 }
1249 insert_table_infos
1250}
1251
1252pub fn get_compaction_group_ids(
1254 version: &HummockVersion,
1255) -> impl Iterator<Item = CompactionGroupId> + '_ {
1256 version.levels.keys().cloned()
1257}
1258
1259pub fn get_table_compaction_group_id_mapping(
1260 version: &HummockVersion,
1261) -> HashMap<StateTableId, CompactionGroupId> {
1262 version
1263 .state_table_info
1264 .info()
1265 .iter()
1266 .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1267 .collect()
1268}
1269
1270pub fn get_compaction_group_ssts(
1272 version: &HummockVersion,
1273 group_id: CompactionGroupId,
1274) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1275 let group_levels = version.get_compaction_group_levels(group_id);
1276 group_levels
1277 .l0
1278 .sub_levels
1279 .iter()
1280 .rev()
1281 .chain(group_levels.levels.iter())
1282 .flat_map(|level| {
1283 level
1284 .table_infos
1285 .iter()
1286 .map(|table_info| (table_info.object_id, table_info.sst_id))
1287 })
1288}
1289
1290pub fn new_sub_level(
1291 sub_level_id: u64,
1292 level_type: PbLevelType,
1293 table_infos: Vec<SstableInfo>,
1294) -> Level {
1295 if level_type == PbLevelType::Nonoverlapping {
1296 debug_assert!(
1297 can_concat(&table_infos),
1298 "sst of non-overlapping level is not concat-able: {:?}",
1299 table_infos
1300 );
1301 }
1302 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1303 let uncompressed_file_size = table_infos
1304 .iter()
1305 .map(|table| table.uncompressed_file_size)
1306 .sum();
1307 Level {
1308 level_idx: 0,
1309 level_type,
1310 table_infos,
1311 total_file_size,
1312 sub_level_id,
1313 uncompressed_file_size,
1314 vnode_partition_count: 0,
1315 }
1316}
1317
1318pub fn add_ssts_to_sub_level(
1319 l0: &mut OverlappingLevel,
1320 sub_level_idx: usize,
1321 insert_table_infos: Vec<SstableInfo>,
1322) {
1323 insert_table_infos.iter().for_each(|sst| {
1324 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1325 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1326 l0.total_file_size += sst.sst_size;
1327 l0.uncompressed_file_size += sst.uncompressed_file_size;
1328 });
1329 l0.sub_levels[sub_level_idx]
1330 .table_infos
1331 .extend(insert_table_infos);
1332 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1333 l0.sub_levels[sub_level_idx]
1334 .table_infos
1335 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1336 assert!(
1337 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1338 "sstable ids: {:?}",
1339 l0.sub_levels[sub_level_idx]
1340 .table_infos
1341 .iter()
1342 .map(|sst| sst.sst_id)
1343 .collect_vec()
1344 );
1345 }
1346}
1347
1348pub fn insert_new_sub_level(
1350 l0: &mut OverlappingLevel,
1351 insert_sub_level_id: u64,
1352 level_type: PbLevelType,
1353 insert_table_infos: Vec<SstableInfo>,
1354 sub_level_insert_hint: Option<usize>,
1355) {
1356 if insert_sub_level_id == u64::MAX {
1357 return;
1358 }
1359 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1360 insert_pos
1361 } else {
1362 if let Some(newest_level) = l0.sub_levels.last() {
1363 assert!(
1364 newest_level.sub_level_id < insert_sub_level_id,
1365 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1366 newest_level.sub_level_id,
1367 insert_sub_level_id,
1368 l0,
1369 );
1370 }
1371 l0.sub_levels.len()
1372 };
1373 #[cfg(debug_assertions)]
1374 {
1375 if insert_pos > 0
1376 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1377 {
1378 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1379 }
1380 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1381 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1382 }
1383 }
1384 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1387 l0.total_file_size += level.total_file_size;
1388 l0.uncompressed_file_size += level.uncompressed_file_size;
1389 l0.sub_levels.insert(insert_pos, level);
1390}
1391
1392fn level_delete_ssts(
1396 operand: &mut Level,
1397 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1398) -> bool {
1399 let original_len = operand.table_infos.len();
1400 operand
1401 .table_infos
1402 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1403 operand.total_file_size = operand
1404 .table_infos
1405 .iter()
1406 .map(|table| table.sst_size)
1407 .sum::<u64>();
1408 operand.uncompressed_file_size = operand
1409 .table_infos
1410 .iter()
1411 .map(|table| table.uncompressed_file_size)
1412 .sum::<u64>();
1413 original_len != operand.table_infos.len()
1414}
1415
1416fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1417 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1418 format!(
1419 "sstable ids: {:?}",
1420 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1421 )
1422 }
1423 operand.total_file_size += insert_table_infos
1424 .iter()
1425 .map(|sst| sst.sst_size)
1426 .sum::<u64>();
1427 operand.uncompressed_file_size += insert_table_infos
1428 .iter()
1429 .map(|sst| sst.uncompressed_file_size)
1430 .sum::<u64>();
1431 if operand.level_type == PbLevelType::Overlapping {
1432 operand.level_type = PbLevelType::Nonoverlapping;
1433 operand
1434 .table_infos
1435 .extend(insert_table_infos.iter().cloned());
1436 operand
1437 .table_infos
1438 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1439 assert!(
1440 can_concat(&operand.table_infos),
1441 "{}",
1442 display_sstable_infos(&operand.table_infos)
1443 );
1444 } else if !insert_table_infos.is_empty() {
1445 let sorted_insert: Vec<_> = insert_table_infos
1446 .iter()
1447 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1448 .cloned()
1449 .collect();
1450 let first = &sorted_insert[0];
1451 let last = &sorted_insert[sorted_insert.len() - 1];
1452 let pos = operand
1453 .table_infos
1454 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1455 if pos >= operand.table_infos.len()
1456 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1457 {
1458 operand.table_infos.splice(pos..pos, sorted_insert);
1459 let validate_range = operand
1461 .table_infos
1462 .iter()
1463 .skip(pos.saturating_sub(1))
1464 .take(insert_table_infos.len() + 2)
1465 .collect_vec();
1466 assert!(
1467 can_concat(&validate_range),
1468 "{}",
1469 display_sstable_infos(&validate_range),
1470 );
1471 } else {
1472 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1475 for i in insert_table_infos {
1476 let pos = operand
1477 .table_infos
1478 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1479 operand.table_infos.insert(pos, i.clone());
1480 }
1481 assert!(
1482 can_concat(&operand.table_infos),
1483 "{}",
1484 display_sstable_infos(&operand.table_infos)
1485 );
1486 }
1487 }
1488}
1489
1490pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1491 match HummockObjectId::Sstable(0.into()) {
1495 HummockObjectId::Sstable(_) => {}
1496 HummockObjectId::VectorFile(_) => {}
1497 HummockObjectId::HnswGraphFile(_) => {}
1498 };
1499 version
1500 .levels
1501 .values()
1502 .flat_map(|cg| {
1503 cg.level0()
1504 .sub_levels
1505 .iter()
1506 .chain(cg.levels.iter())
1507 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1508 })
1509 .chain(version.table_change_log.values().flat_map(|c| {
1510 c.iter().flat_map(|l| {
1511 l.old_value
1512 .iter()
1513 .chain(l.new_value.iter())
1514 .map(|t| (t.object_id, t.file_size))
1515 })
1516 }))
1517 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1518 .chain(
1519 version
1520 .vector_indexes
1521 .values()
1522 .flat_map(|index| index.get_objects()),
1523 )
1524 .collect()
1525}
1526
1527pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1530 let mut res = Vec::new();
1531 for (group_id, levels) in &version.levels {
1533 if levels.group_id != *group_id {
1535 res.push(format!(
1536 "GROUP {}: inconsistent group id {} in Levels",
1537 group_id, levels.group_id
1538 ));
1539 }
1540
1541 let validate_level = |group: CompactionGroupId,
1542 expected_level_idx: u32,
1543 level: &Level,
1544 res: &mut Vec<String>| {
1545 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1546 if level.level_idx == 0 {
1547 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1548 if level.table_infos.is_empty() {
1550 res.push(format!("{}: empty level", level_identifier));
1551 }
1552 } else if level.level_type != PbLevelType::Nonoverlapping {
1553 res.push(format!(
1555 "{}: level type {:?} is not non-overlapping",
1556 level_identifier, level.level_type
1557 ));
1558 }
1559
1560 if level.level_idx != expected_level_idx {
1562 res.push(format!(
1563 "{}: mismatched level idx {}",
1564 level_identifier, expected_level_idx
1565 ));
1566 }
1567
1568 let mut prev_table_info: Option<&SstableInfo> = None;
1569 for table_info in &level.table_infos {
1570 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1572 res.push(format!(
1573 "{} SST {}: table_ids not sorted",
1574 level_identifier, table_info.object_id
1575 ));
1576 }
1577
1578 if level.level_type == PbLevelType::Nonoverlapping {
1580 if let Some(prev) = prev_table_info.take()
1581 && prev
1582 .key_range
1583 .compare_right_with(&table_info.key_range.left)
1584 != Ordering::Less
1585 {
1586 res.push(format!(
1587 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1588 level_identifier, table_info.object_id, prev, table_info
1589 ));
1590 }
1591 let _ = prev_table_info.insert(table_info);
1592 }
1593 }
1594 };
1595
1596 let l0 = &levels.l0;
1597 let mut prev_sub_level_id = u64::MAX;
1598 for sub_level in &l0.sub_levels {
1599 if sub_level.sub_level_id >= prev_sub_level_id {
1601 res.push(format!(
1602 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1603 group_id, sub_level.level_idx, prev_sub_level_id
1604 ));
1605 }
1606 prev_sub_level_id = sub_level.sub_level_id;
1607
1608 validate_level(*group_id, 0, sub_level, &mut res);
1609 }
1610
1611 for idx in 1..=levels.levels.len() {
1612 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1613 }
1614 }
1615 res
1616}
1617
1618#[cfg(test)]
1619mod tests {
1620 use std::collections::{HashMap, HashSet};
1621
1622 use bytes::Bytes;
1623 use risingwave_common::catalog::TableId;
1624 use risingwave_common::hash::VirtualNode;
1625 use risingwave_common::util::epoch::test_epoch;
1626 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1627
1628 use super::group_split;
1629 use crate::HummockVersionId;
1630 use crate::compaction_group::group_split::*;
1631 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1632 use crate::key::{FullKey, gen_key_from_str};
1633 use crate::key_range::KeyRange;
1634 use crate::level::{Level, Levels, OverlappingLevel};
1635 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1636 use crate::version::{
1637 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1638 };
1639
1640 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1641 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1642 }
1643
1644 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1645 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1646 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1647 let full_key_l = FullKey::for_test(
1648 TableId::new(*table_ids.first().unwrap()),
1649 table_key_l,
1650 epoch,
1651 )
1652 .encode();
1653 let full_key_r =
1654 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1655 .encode();
1656
1657 SstableInfoInner {
1658 sst_id: sst_id.into(),
1659 key_range: KeyRange {
1660 left: full_key_l.into(),
1661 right: full_key_r.into(),
1662 right_exclusive: false,
1663 },
1664 table_ids,
1665 object_id: sst_id.into(),
1666 min_epoch: 20,
1667 max_epoch: 20,
1668 file_size: 100,
1669 sst_size: 100,
1670 ..Default::default()
1671 }
1672 }
1673
1674 #[test]
1675 fn test_get_sst_object_ids() {
1676 let mut version = HummockVersion {
1677 id: HummockVersionId::new(0),
1678 levels: HashMap::from_iter([(
1679 0,
1680 Levels {
1681 levels: vec![],
1682 l0: OverlappingLevel {
1683 sub_levels: vec![],
1684 total_file_size: 0,
1685 uncompressed_file_size: 0,
1686 },
1687 ..Default::default()
1688 },
1689 )]),
1690 ..Default::default()
1691 };
1692 assert_eq!(version.get_object_ids(false).count(), 0);
1693
1694 version
1696 .levels
1697 .get_mut(&0)
1698 .unwrap()
1699 .l0
1700 .sub_levels
1701 .push(Level {
1702 table_infos: vec![
1703 SstableInfoInner {
1704 object_id: 11.into(),
1705 sst_id: 11.into(),
1706 ..Default::default()
1707 }
1708 .into(),
1709 ],
1710 ..Default::default()
1711 });
1712 assert_eq!(version.get_object_ids(false).count(), 1);
1713
1714 version.levels.get_mut(&0).unwrap().levels.push(Level {
1716 table_infos: vec![
1717 SstableInfoInner {
1718 object_id: 22.into(),
1719 sst_id: 22.into(),
1720 ..Default::default()
1721 }
1722 .into(),
1723 ],
1724 ..Default::default()
1725 });
1726 assert_eq!(version.get_object_ids(false).count(), 2);
1727 }
1728
1729 #[test]
1730 fn test_apply_version_delta() {
1731 let mut version = HummockVersion {
1732 id: HummockVersionId::new(0),
1733 levels: HashMap::from_iter([
1734 (
1735 0,
1736 build_initial_compaction_group_levels(
1737 0,
1738 &CompactionConfig {
1739 max_level: 6,
1740 ..Default::default()
1741 },
1742 ),
1743 ),
1744 (
1745 1,
1746 build_initial_compaction_group_levels(
1747 1,
1748 &CompactionConfig {
1749 max_level: 6,
1750 ..Default::default()
1751 },
1752 ),
1753 ),
1754 ]),
1755 ..Default::default()
1756 };
1757 let version_delta = HummockVersionDelta {
1758 id: HummockVersionId::new(1),
1759 group_deltas: HashMap::from_iter([
1760 (
1761 2,
1762 GroupDeltas {
1763 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1764 group_config: Some(CompactionConfig {
1765 max_level: 6,
1766 ..Default::default()
1767 }),
1768 ..Default::default()
1769 }))],
1770 },
1771 ),
1772 (
1773 0,
1774 GroupDeltas {
1775 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1776 },
1777 ),
1778 (
1779 1,
1780 GroupDeltas {
1781 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1782 1,
1783 0,
1784 HashSet::new(),
1785 vec![
1786 SstableInfoInner {
1787 object_id: 1.into(),
1788 sst_id: 1.into(),
1789 ..Default::default()
1790 }
1791 .into(),
1792 ],
1793 0,
1794 version
1795 .levels
1796 .get(&1)
1797 .as_ref()
1798 .unwrap()
1799 .compaction_group_version_id,
1800 ))],
1801 },
1802 ),
1803 ]),
1804 ..Default::default()
1805 };
1806 let version_delta = version_delta;
1807
1808 version.apply_version_delta(&version_delta);
1809 let mut cg1 = build_initial_compaction_group_levels(
1810 1,
1811 &CompactionConfig {
1812 max_level: 6,
1813 ..Default::default()
1814 },
1815 );
1816 cg1.levels[0] = Level {
1817 level_idx: 1,
1818 level_type: LevelType::Nonoverlapping,
1819 table_infos: vec![
1820 SstableInfoInner {
1821 object_id: 1.into(),
1822 sst_id: 1.into(),
1823 ..Default::default()
1824 }
1825 .into(),
1826 ],
1827 ..Default::default()
1828 };
1829 assert_eq!(
1830 version,
1831 HummockVersion {
1832 id: HummockVersionId::new(1),
1833 levels: HashMap::from_iter([
1834 (
1835 2,
1836 build_initial_compaction_group_levels(
1837 2,
1838 &CompactionConfig {
1839 max_level: 6,
1840 ..Default::default()
1841 },
1842 ),
1843 ),
1844 (1, cg1),
1845 ]),
1846 ..Default::default()
1847 }
1848 );
1849 }
1850
1851 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1852 gen_sst_info_impl(object_id, table_ids, left, right).into()
1853 }
1854
1855 fn gen_sst_info_impl(
1856 object_id: u64,
1857 table_ids: Vec<u32>,
1858 left: Bytes,
1859 right: Bytes,
1860 ) -> SstableInfoInner {
1861 SstableInfoInner {
1862 object_id: object_id.into(),
1863 sst_id: object_id.into(),
1864 key_range: KeyRange {
1865 left,
1866 right,
1867 right_exclusive: false,
1868 },
1869 table_ids,
1870 file_size: 100,
1871 sst_size: 100,
1872 uncompressed_file_size: 100,
1873 ..Default::default()
1874 }
1875 }
1876
1877 #[test]
1878 fn test_merge_levels() {
1879 let mut left_levels = build_initial_compaction_group_levels(
1880 1,
1881 &CompactionConfig {
1882 max_level: 6,
1883 ..Default::default()
1884 },
1885 );
1886
1887 let mut right_levels = build_initial_compaction_group_levels(
1888 2,
1889 &CompactionConfig {
1890 max_level: 6,
1891 ..Default::default()
1892 },
1893 );
1894
1895 left_levels.levels[0] = Level {
1896 level_idx: 1,
1897 level_type: LevelType::Nonoverlapping,
1898 table_infos: vec![
1899 gen_sst_info(
1900 1,
1901 vec![3],
1902 FullKey::for_test(
1903 TableId::new(3),
1904 gen_key_from_str(VirtualNode::from_index(1), "1"),
1905 0,
1906 )
1907 .encode()
1908 .into(),
1909 FullKey::for_test(
1910 TableId::new(3),
1911 gen_key_from_str(VirtualNode::from_index(200), "1"),
1912 0,
1913 )
1914 .encode()
1915 .into(),
1916 ),
1917 gen_sst_info(
1918 10,
1919 vec![3, 4],
1920 FullKey::for_test(
1921 TableId::new(3),
1922 gen_key_from_str(VirtualNode::from_index(201), "1"),
1923 0,
1924 )
1925 .encode()
1926 .into(),
1927 FullKey::for_test(
1928 TableId::new(4),
1929 gen_key_from_str(VirtualNode::from_index(10), "1"),
1930 0,
1931 )
1932 .encode()
1933 .into(),
1934 ),
1935 gen_sst_info(
1936 11,
1937 vec![4],
1938 FullKey::for_test(
1939 TableId::new(4),
1940 gen_key_from_str(VirtualNode::from_index(11), "1"),
1941 0,
1942 )
1943 .encode()
1944 .into(),
1945 FullKey::for_test(
1946 TableId::new(4),
1947 gen_key_from_str(VirtualNode::from_index(200), "1"),
1948 0,
1949 )
1950 .encode()
1951 .into(),
1952 ),
1953 ],
1954 total_file_size: 300,
1955 ..Default::default()
1956 };
1957
1958 left_levels.l0.sub_levels.push(Level {
1959 level_idx: 0,
1960 table_infos: vec![gen_sst_info(
1961 3,
1962 vec![3],
1963 FullKey::for_test(
1964 TableId::new(3),
1965 gen_key_from_str(VirtualNode::from_index(1), "1"),
1966 0,
1967 )
1968 .encode()
1969 .into(),
1970 FullKey::for_test(
1971 TableId::new(3),
1972 gen_key_from_str(VirtualNode::from_index(200), "1"),
1973 0,
1974 )
1975 .encode()
1976 .into(),
1977 )],
1978 sub_level_id: 101,
1979 level_type: LevelType::Overlapping,
1980 total_file_size: 100,
1981 ..Default::default()
1982 });
1983
1984 left_levels.l0.sub_levels.push(Level {
1985 level_idx: 0,
1986 table_infos: vec![gen_sst_info(
1987 3,
1988 vec![3],
1989 FullKey::for_test(
1990 TableId::new(3),
1991 gen_key_from_str(VirtualNode::from_index(1), "1"),
1992 0,
1993 )
1994 .encode()
1995 .into(),
1996 FullKey::for_test(
1997 TableId::new(3),
1998 gen_key_from_str(VirtualNode::from_index(200), "1"),
1999 0,
2000 )
2001 .encode()
2002 .into(),
2003 )],
2004 sub_level_id: 103,
2005 level_type: LevelType::Overlapping,
2006 total_file_size: 100,
2007 ..Default::default()
2008 });
2009
2010 left_levels.l0.sub_levels.push(Level {
2011 level_idx: 0,
2012 table_infos: vec![gen_sst_info(
2013 3,
2014 vec![3],
2015 FullKey::for_test(
2016 TableId::new(3),
2017 gen_key_from_str(VirtualNode::from_index(1), "1"),
2018 0,
2019 )
2020 .encode()
2021 .into(),
2022 FullKey::for_test(
2023 TableId::new(3),
2024 gen_key_from_str(VirtualNode::from_index(200), "1"),
2025 0,
2026 )
2027 .encode()
2028 .into(),
2029 )],
2030 sub_level_id: 105,
2031 level_type: LevelType::Nonoverlapping,
2032 total_file_size: 100,
2033 ..Default::default()
2034 });
2035
2036 right_levels.levels[0] = Level {
2037 level_idx: 1,
2038 level_type: LevelType::Nonoverlapping,
2039 table_infos: vec![
2040 gen_sst_info(
2041 1,
2042 vec![5],
2043 FullKey::for_test(
2044 TableId::new(5),
2045 gen_key_from_str(VirtualNode::from_index(1), "1"),
2046 0,
2047 )
2048 .encode()
2049 .into(),
2050 FullKey::for_test(
2051 TableId::new(5),
2052 gen_key_from_str(VirtualNode::from_index(200), "1"),
2053 0,
2054 )
2055 .encode()
2056 .into(),
2057 ),
2058 gen_sst_info(
2059 10,
2060 vec![5, 6],
2061 FullKey::for_test(
2062 TableId::new(5),
2063 gen_key_from_str(VirtualNode::from_index(201), "1"),
2064 0,
2065 )
2066 .encode()
2067 .into(),
2068 FullKey::for_test(
2069 TableId::new(6),
2070 gen_key_from_str(VirtualNode::from_index(10), "1"),
2071 0,
2072 )
2073 .encode()
2074 .into(),
2075 ),
2076 gen_sst_info(
2077 11,
2078 vec![6],
2079 FullKey::for_test(
2080 TableId::new(6),
2081 gen_key_from_str(VirtualNode::from_index(11), "1"),
2082 0,
2083 )
2084 .encode()
2085 .into(),
2086 FullKey::for_test(
2087 TableId::new(6),
2088 gen_key_from_str(VirtualNode::from_index(200), "1"),
2089 0,
2090 )
2091 .encode()
2092 .into(),
2093 ),
2094 ],
2095 total_file_size: 300,
2096 ..Default::default()
2097 };
2098
2099 right_levels.l0.sub_levels.push(Level {
2100 level_idx: 0,
2101 table_infos: vec![gen_sst_info(
2102 3,
2103 vec![5],
2104 FullKey::for_test(
2105 TableId::new(5),
2106 gen_key_from_str(VirtualNode::from_index(1), "1"),
2107 0,
2108 )
2109 .encode()
2110 .into(),
2111 FullKey::for_test(
2112 TableId::new(5),
2113 gen_key_from_str(VirtualNode::from_index(200), "1"),
2114 0,
2115 )
2116 .encode()
2117 .into(),
2118 )],
2119 sub_level_id: 101,
2120 level_type: LevelType::Overlapping,
2121 total_file_size: 100,
2122 ..Default::default()
2123 });
2124
2125 right_levels.l0.sub_levels.push(Level {
2126 level_idx: 0,
2127 table_infos: vec![gen_sst_info(
2128 5,
2129 vec![5],
2130 FullKey::for_test(
2131 TableId::new(5),
2132 gen_key_from_str(VirtualNode::from_index(1), "1"),
2133 0,
2134 )
2135 .encode()
2136 .into(),
2137 FullKey::for_test(
2138 TableId::new(5),
2139 gen_key_from_str(VirtualNode::from_index(200), "1"),
2140 0,
2141 )
2142 .encode()
2143 .into(),
2144 )],
2145 sub_level_id: 102,
2146 level_type: LevelType::Overlapping,
2147 total_file_size: 100,
2148 ..Default::default()
2149 });
2150
2151 right_levels.l0.sub_levels.push(Level {
2152 level_idx: 0,
2153 table_infos: vec![gen_sst_info(
2154 3,
2155 vec![5],
2156 FullKey::for_test(
2157 TableId::new(5),
2158 gen_key_from_str(VirtualNode::from_index(1), "1"),
2159 0,
2160 )
2161 .encode()
2162 .into(),
2163 FullKey::for_test(
2164 TableId::new(5),
2165 gen_key_from_str(VirtualNode::from_index(200), "1"),
2166 0,
2167 )
2168 .encode()
2169 .into(),
2170 )],
2171 sub_level_id: 103,
2172 level_type: LevelType::Nonoverlapping,
2173 total_file_size: 100,
2174 ..Default::default()
2175 });
2176
2177 {
2178 let mut left_levels = Levels::default();
2180 let right_levels = Levels::default();
2181
2182 group_split::merge_levels(&mut left_levels, right_levels);
2183 }
2184
2185 {
2186 let mut left_levels = build_initial_compaction_group_levels(
2188 1,
2189 &CompactionConfig {
2190 max_level: 6,
2191 ..Default::default()
2192 },
2193 );
2194 let right_levels = right_levels.clone();
2195
2196 group_split::merge_levels(&mut left_levels, right_levels);
2197
2198 assert!(left_levels.l0.sub_levels.len() == 3);
2199 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2200 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2201 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2202 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2203 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2204 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2205
2206 assert!(left_levels.levels[0].level_idx == 1);
2207 assert_eq!(300, left_levels.levels[0].total_file_size);
2208 }
2209
2210 {
2211 let mut left_levels = left_levels.clone();
2213 let right_levels = build_initial_compaction_group_levels(
2214 2,
2215 &CompactionConfig {
2216 max_level: 6,
2217 ..Default::default()
2218 },
2219 );
2220
2221 group_split::merge_levels(&mut left_levels, right_levels);
2222
2223 assert!(left_levels.l0.sub_levels.len() == 3);
2224 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2225 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2226 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2227 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2228 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2229 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2230
2231 assert!(left_levels.levels[0].level_idx == 1);
2232 assert_eq!(300, left_levels.levels[0].total_file_size);
2233 }
2234
2235 {
2236 let mut left_levels = left_levels.clone();
2237 let right_levels = right_levels.clone();
2238
2239 group_split::merge_levels(&mut left_levels, right_levels);
2240
2241 assert!(left_levels.l0.sub_levels.len() == 6);
2242 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2243 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2244 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2245 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2246 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2247 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2248 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2249 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2250 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2251 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2252 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2253 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2254
2255 assert!(left_levels.levels[0].level_idx == 1);
2256 assert_eq!(600, left_levels.levels[0].total_file_size);
2257 }
2258 }
2259
2260 #[test]
2261 fn test_get_split_pos() {
2262 let epoch = test_epoch(1);
2263 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2264 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2265 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2266
2267 let ssts = vec![s1, s2, s3];
2268 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2269
2270 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2271 assert_eq!(1, pos);
2272
2273 let pos = group_split::get_split_pos(&vec![], split_key);
2274 assert_eq!(0, pos);
2275 }
2276
2277 #[test]
2278 fn test_split_sst() {
2279 let epoch = test_epoch(1);
2280 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2281
2282 {
2283 let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2284 let origin_sst = sst.clone();
2285 let sst_size = origin_sst.sst_size;
2286 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2287 assert_eq!(SstSplitType::Both, split_type);
2288
2289 let mut new_sst_id = 10.into();
2290 let (origin_sst, branched_sst) = group_split::split_sst(
2291 origin_sst,
2292 &mut new_sst_id,
2293 split_key,
2294 sst_size / 2,
2295 sst_size / 2,
2296 );
2297
2298 let origin_sst = origin_sst.unwrap();
2299 let branched_sst = branched_sst.unwrap();
2300
2301 assert!(origin_sst.key_range.right_exclusive);
2302 assert!(
2303 origin_sst
2304 .key_range
2305 .right
2306 .cmp(&branched_sst.key_range.left)
2307 .is_le()
2308 );
2309 assert!(origin_sst.table_ids.is_sorted());
2310 assert!(branched_sst.table_ids.is_sorted());
2311 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2312 assert!(branched_sst.sst_size < origin_sst.file_size);
2313 assert_eq!(10, branched_sst.sst_id);
2314 assert_eq!(11, origin_sst.sst_id);
2315 assert_eq!(&3, branched_sst.table_ids.first().unwrap()); }
2317
2318 {
2319 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2321 let origin_sst = sst.clone();
2322 let sst_size = origin_sst.sst_size;
2323 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2324 assert_eq!(SstSplitType::Both, split_type);
2325
2326 let mut new_sst_id = 10.into();
2327 let (origin_sst, branched_sst) = group_split::split_sst(
2328 origin_sst,
2329 &mut new_sst_id,
2330 split_key,
2331 sst_size / 2,
2332 sst_size / 2,
2333 );
2334
2335 let origin_sst = origin_sst.unwrap();
2336 let branched_sst = branched_sst.unwrap();
2337
2338 assert!(origin_sst.key_range.right_exclusive);
2339 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2340 assert!(origin_sst.table_ids.is_sorted());
2341 assert!(branched_sst.table_ids.is_sorted());
2342 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2343 assert!(branched_sst.sst_size < origin_sst.file_size);
2344 assert_eq!(10, branched_sst.sst_id);
2345 assert_eq!(11, origin_sst.sst_id);
2346 assert_eq!(&5, branched_sst.table_ids.first().unwrap()); }
2348
2349 {
2350 let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2351 let origin_sst = sst.clone();
2352 let split_type = group_split::need_to_split(&origin_sst, split_key);
2353 assert_eq!(SstSplitType::Left, split_type);
2354 }
2355
2356 {
2357 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2358 let origin_sst = sst.clone();
2359 let split_type = group_split::need_to_split(&origin_sst, split_key);
2360 assert_eq!(SstSplitType::Both, split_type);
2361
2362 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2363 let origin_sst = sst.clone();
2364 let split_type = group_split::need_to_split(&origin_sst, split_key);
2365 assert_eq!(SstSplitType::Right, split_type);
2366 }
2367
2368 {
2369 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2371 sst.key_range.right = sst.key_range.left.clone();
2372 let sst: SstableInfo = sst.into();
2373 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2374 let origin_sst = sst.clone();
2375 let sst_size = origin_sst.sst_size;
2376
2377 let mut new_sst_id = 10.into();
2378 let (origin_sst, branched_sst) = group_split::split_sst(
2379 origin_sst,
2380 &mut new_sst_id,
2381 split_key,
2382 sst_size / 2,
2383 sst_size / 2,
2384 );
2385
2386 assert!(origin_sst.is_none());
2387 assert!(branched_sst.is_some());
2388 }
2389 }
2390
2391 #[test]
2392 fn test_split_sst_info_for_level() {
2393 let mut version = HummockVersion {
2394 id: HummockVersionId(0),
2395 levels: HashMap::from_iter([(
2396 1,
2397 build_initial_compaction_group_levels(
2398 1,
2399 &CompactionConfig {
2400 max_level: 6,
2401 ..Default::default()
2402 },
2403 ),
2404 )]),
2405 ..Default::default()
2406 };
2407
2408 let cg1 = version.levels.get_mut(&1).unwrap();
2409
2410 cg1.levels[0] = Level {
2411 level_idx: 1,
2412 level_type: LevelType::Nonoverlapping,
2413 table_infos: vec![
2414 gen_sst_info(
2415 1,
2416 vec![3],
2417 FullKey::for_test(
2418 TableId::new(3),
2419 gen_key_from_str(VirtualNode::from_index(1), "1"),
2420 0,
2421 )
2422 .encode()
2423 .into(),
2424 FullKey::for_test(
2425 TableId::new(3),
2426 gen_key_from_str(VirtualNode::from_index(200), "1"),
2427 0,
2428 )
2429 .encode()
2430 .into(),
2431 ),
2432 gen_sst_info(
2433 10,
2434 vec![3, 4],
2435 FullKey::for_test(
2436 TableId::new(3),
2437 gen_key_from_str(VirtualNode::from_index(201), "1"),
2438 0,
2439 )
2440 .encode()
2441 .into(),
2442 FullKey::for_test(
2443 TableId::new(4),
2444 gen_key_from_str(VirtualNode::from_index(10), "1"),
2445 0,
2446 )
2447 .encode()
2448 .into(),
2449 ),
2450 gen_sst_info(
2451 11,
2452 vec![4],
2453 FullKey::for_test(
2454 TableId::new(4),
2455 gen_key_from_str(VirtualNode::from_index(11), "1"),
2456 0,
2457 )
2458 .encode()
2459 .into(),
2460 FullKey::for_test(
2461 TableId::new(4),
2462 gen_key_from_str(VirtualNode::from_index(200), "1"),
2463 0,
2464 )
2465 .encode()
2466 .into(),
2467 ),
2468 ],
2469 total_file_size: 300,
2470 ..Default::default()
2471 };
2472
2473 cg1.l0.sub_levels.push(Level {
2474 level_idx: 0,
2475 table_infos: vec![
2476 gen_sst_info(
2477 2,
2478 vec![2],
2479 FullKey::for_test(
2480 TableId::new(0),
2481 gen_key_from_str(VirtualNode::from_index(1), "1"),
2482 0,
2483 )
2484 .encode()
2485 .into(),
2486 FullKey::for_test(
2487 TableId::new(2),
2488 gen_key_from_str(VirtualNode::from_index(200), "1"),
2489 0,
2490 )
2491 .encode()
2492 .into(),
2493 ),
2494 gen_sst_info(
2495 22,
2496 vec![2],
2497 FullKey::for_test(
2498 TableId::new(0),
2499 gen_key_from_str(VirtualNode::from_index(1), "1"),
2500 0,
2501 )
2502 .encode()
2503 .into(),
2504 FullKey::for_test(
2505 TableId::new(2),
2506 gen_key_from_str(VirtualNode::from_index(200), "1"),
2507 0,
2508 )
2509 .encode()
2510 .into(),
2511 ),
2512 gen_sst_info(
2513 23,
2514 vec![2],
2515 FullKey::for_test(
2516 TableId::new(0),
2517 gen_key_from_str(VirtualNode::from_index(1), "1"),
2518 0,
2519 )
2520 .encode()
2521 .into(),
2522 FullKey::for_test(
2523 TableId::new(2),
2524 gen_key_from_str(VirtualNode::from_index(200), "1"),
2525 0,
2526 )
2527 .encode()
2528 .into(),
2529 ),
2530 gen_sst_info(
2531 24,
2532 vec![2],
2533 FullKey::for_test(
2534 TableId::new(2),
2535 gen_key_from_str(VirtualNode::from_index(1), "1"),
2536 0,
2537 )
2538 .encode()
2539 .into(),
2540 FullKey::for_test(
2541 TableId::new(2),
2542 gen_key_from_str(VirtualNode::from_index(200), "1"),
2543 0,
2544 )
2545 .encode()
2546 .into(),
2547 ),
2548 gen_sst_info(
2549 25,
2550 vec![2],
2551 FullKey::for_test(
2552 TableId::new(0),
2553 gen_key_from_str(VirtualNode::from_index(1), "1"),
2554 0,
2555 )
2556 .encode()
2557 .into(),
2558 FullKey::for_test(
2559 TableId::new(0),
2560 gen_key_from_str(VirtualNode::from_index(200), "1"),
2561 0,
2562 )
2563 .encode()
2564 .into(),
2565 ),
2566 ],
2567 sub_level_id: 101,
2568 level_type: LevelType::Overlapping,
2569 total_file_size: 300,
2570 ..Default::default()
2571 });
2572
2573 {
2574 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2576
2577 let mut new_sst_id = 100.into();
2578 let x = group_split::split_sst_info_for_level_v2(
2579 &mut cg1.l0.sub_levels[0],
2580 &mut new_sst_id,
2581 split_key,
2582 );
2583 let mut right_l0 = OverlappingLevel {
2592 sub_levels: vec![],
2593 total_file_size: 0,
2594 uncompressed_file_size: 0,
2595 };
2596
2597 right_l0.sub_levels.push(Level {
2598 level_idx: 0,
2599 table_infos: x,
2600 sub_level_id: 101,
2601 total_file_size: 100,
2602 level_type: LevelType::Overlapping,
2603 ..Default::default()
2604 });
2605
2606 let right_levels = Levels {
2607 levels: vec![],
2608 l0: right_l0,
2609 ..Default::default()
2610 };
2611
2612 merge_levels(cg1, right_levels);
2613 }
2614
2615 {
2616 let mut new_sst_id = 100.into();
2618 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2619 let x = group_split::split_sst_info_for_level_v2(
2620 &mut cg1.levels[2],
2621 &mut new_sst_id,
2622 split_key,
2623 );
2624
2625 assert!(x.is_empty());
2626 }
2627
2628 {
2629 let mut cg1 = cg1.clone();
2631 let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2632
2633 let mut new_sst_id = 100.into();
2634 let x = group_split::split_sst_info_for_level_v2(
2635 &mut cg1.levels[0],
2636 &mut new_sst_id,
2637 split_key,
2638 );
2639
2640 assert_eq!(3, x.len());
2641 assert_eq!(1, x[0].sst_id);
2642 assert_eq!(100, x[0].sst_size);
2643 assert_eq!(10, x[1].sst_id);
2644 assert_eq!(100, x[1].sst_size);
2645 assert_eq!(11, x[2].sst_id);
2646 assert_eq!(100, x[2].sst_size);
2647
2648 assert_eq!(0, cg1.levels[0].table_infos.len());
2649 }
2650
2651 {
2652 let mut cg1 = cg1.clone();
2654 let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2655
2656 let mut new_sst_id = 100.into();
2657 let x = group_split::split_sst_info_for_level_v2(
2658 &mut cg1.levels[0],
2659 &mut new_sst_id,
2660 split_key,
2661 );
2662
2663 assert_eq!(0, x.len());
2664 assert_eq!(3, cg1.levels[0].table_infos.len());
2665 }
2666
2667 {
2693 let mut cg1 = cg1.clone();
2695 let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2696
2697 let mut new_sst_id = 100.into();
2698 let x = group_split::split_sst_info_for_level_v2(
2699 &mut cg1.levels[0],
2700 &mut new_sst_id,
2701 split_key,
2702 );
2703
2704 assert_eq!(2, x.len());
2705 assert_eq!(100, x[0].sst_id);
2706 assert_eq!(100 / 2, x[0].sst_size);
2707 assert_eq!(11, x[1].sst_id);
2708 assert_eq!(100, x[1].sst_size);
2709 assert_eq!(vec![4], x[1].table_ids);
2710
2711 assert_eq!(2, cg1.levels[0].table_infos.len());
2712 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2713 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2714 assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2715 }
2716 }
2717}