1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52 pub insert_sst_level: u32,
53 pub insert_sst_infos: Vec<SstableInfo>,
54 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61 self.levels
62 .get(&compaction_group_id)
63 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64 }
65
66 pub fn get_compaction_group_levels_mut(
67 &mut self,
68 compaction_group_id: CompactionGroupId,
69 ) -> &mut Levels {
70 self.levels
71 .get_mut(&compaction_group_id)
72 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73 }
74
75 pub fn get_sst_ids_by_group_id(
77 &self,
78 compaction_group_id: CompactionGroupId,
79 ) -> impl Iterator<Item = HummockSstableId> + '_ {
80 self.levels
81 .iter()
82 .filter_map(move |(cg_id, level)| {
83 if *cg_id == compaction_group_id {
84 Some(level)
85 } else {
86 None
87 }
88 })
89 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90 .flat_map(|level| level.table_infos.iter())
91 .map(|s| s.sst_id)
92 }
93
94 pub fn level_iter<F: FnMut(&Level) -> bool>(
95 &self,
96 compaction_group_id: CompactionGroupId,
97 mut f: F,
98 ) {
99 if let Some(levels) = self.levels.get(&compaction_group_id) {
100 for sub_level in &levels.l0.sub_levels {
101 if !f(sub_level) {
102 return;
103 }
104 }
105 for level in &levels.levels {
106 if !f(level) {
107 return;
108 }
109 }
110 }
111 }
112
113 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114 self.levels
116 .get(&compaction_group_id)
117 .map(|group| group.levels.len() + 1)
118 .unwrap_or(0)
119 }
120
121 pub fn safe_epoch_table_watermarks(
122 &self,
123 existing_table_ids: &[TableId],
124 ) -> BTreeMap<TableId, TableWatermarks> {
125 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126 }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131 existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133 fn extract_single_table_watermark(
134 table_watermarks: &TableWatermarks,
135 ) -> Option<TableWatermarks> {
136 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137 Some(TableWatermarks {
138 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139 direction: table_watermarks.direction,
140 watermark_type: table_watermarks.watermark_type,
141 })
142 } else {
143 None
144 }
145 }
146 table_watermarks
147 .iter()
148 .filter_map(|(table_id, table_watermarks)| {
149 if !existing_table_ids.contains(table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (*table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 table_id,
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level
296 .table_infos
297 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298 .for_each(|sst_info| {
299 sub_level.total_file_size -= sst_info.sst_size;
300 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301 l0.total_file_size -= sst_info.sst_size;
302 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303 });
304 if insert_table_infos.is_empty() {
305 continue;
306 }
307 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308 Ok(idx) => {
309 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310 }
311 Err(idx) => {
312 insert_new_sub_level(
313 target_l0,
314 sub_level.sub_level_id,
315 sub_level.level_type,
316 insert_table_infos,
317 Some(idx),
318 );
319 }
320 }
321 }
322
323 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324 }
325 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326 let insert_table_infos =
327 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328 cur_levels.levels[idx].total_file_size += insert_table_infos
329 .iter()
330 .map(|sst| sst.sst_size)
331 .sum::<u64>();
332 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333 .iter()
334 .map(|sst| sst.uncompressed_file_size)
335 .sum::<u64>();
336 cur_levels.levels[idx]
337 .table_infos
338 .extend(insert_table_infos);
339 cur_levels.levels[idx]
340 .table_infos
341 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342 assert!(can_concat(&cur_levels.levels[idx].table_infos));
343 level
344 .table_infos
345 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346 .for_each(|sst_info| {
347 level.total_file_size -= sst_info.sst_size;
348 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349 });
350 }
351
352 assert!(
353 parent_levels
354 .l0
355 .sub_levels
356 .iter()
357 .all(|level| !level.table_infos.is_empty())
358 );
359 assert!(
360 cur_levels
361 .l0
362 .sub_levels
363 .iter()
364 .all(|level| !level.table_infos.is_empty())
365 );
366 }
367
368 pub fn build_sst_delta_infos(
369 &self,
370 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371 ) -> Vec<SstDeltaInfo> {
372 let mut infos = vec![];
373
374 if version_delta.trivial_move {
377 return infos;
378 }
379
380 for (group_id, group_deltas) in &version_delta.group_deltas {
381 let mut info = SstDeltaInfo::default();
382
383 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386 if !group_deltas.group_deltas.iter().all(|delta| {
388 matches!(
389 delta,
390 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391 )
392 }) {
393 continue;
394 }
395
396 for group_delta in &group_deltas.group_deltas {
400 match group_delta {
401 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402 if !inserted_table_infos.is_empty() {
403 info.insert_sst_level = 0;
404 info.insert_sst_infos
405 .extend(inserted_table_infos.iter().cloned());
406 }
407 }
408 GroupDeltaCommon::IntraLevel(intra_level) => {
409 if !intra_level.inserted_table_infos.is_empty() {
410 info.insert_sst_level = intra_level.level_idx;
411 info.insert_sst_infos
412 .extend(intra_level.inserted_table_infos.iter().cloned());
413 }
414 if !intra_level.removed_table_ids.is_empty() {
415 for id in &intra_level.removed_table_ids {
416 if intra_level.level_idx == 0 {
417 removed_l0_ssts.insert(*id);
418 } else {
419 removed_ssts
420 .entry(intra_level.level_idx)
421 .or_default()
422 .insert(*id);
423 }
424 }
425 }
426 }
427 GroupDeltaCommon::GroupConstruct(_)
428 | GroupDeltaCommon::GroupDestroy(_)
429 | GroupDeltaCommon::GroupMerge(_)
430 | GroupDeltaCommon::TruncateTables(_) => {}
431 }
432 }
433
434 let group = self.levels.get(group_id).unwrap();
435 for l0_sub_level in &group.level0().sub_levels {
436 for sst_info in &l0_sub_level.table_infos {
437 if removed_l0_ssts.remove(&sst_info.sst_id) {
438 info.delete_sst_object_ids.push(sst_info.object_id);
439 }
440 }
441 }
442 for level in &group.levels {
443 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
444 for sst_info in &level.table_infos {
445 if removed_level_ssts.remove(&sst_info.sst_id) {
446 info.delete_sst_object_ids.push(sst_info.object_id);
447 }
448 }
449 if !removed_level_ssts.is_empty() {
450 tracing::error!(
451 "removed_level_ssts is not empty: {:?}",
452 removed_level_ssts,
453 );
454 }
455 debug_assert!(removed_level_ssts.is_empty());
456 }
457 }
458
459 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
460 tracing::error!(
461 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
462 removed_l0_ssts,
463 removed_ssts
464 );
465 }
466 debug_assert!(removed_l0_ssts.is_empty());
467 debug_assert!(removed_ssts.is_empty());
468
469 infos.push(info);
470 }
471
472 infos
473 }
474
475 pub fn apply_version_delta(
476 &mut self,
477 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
478 ) {
479 assert_eq!(self.id, version_delta.prev_id);
480
481 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
482 &version_delta.state_table_info_delta,
483 &version_delta.removed_table_ids,
484 );
485
486 #[expect(deprecated)]
487 {
488 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
489 is_commit_epoch = true;
490 tracing::trace!(
491 "max committed epoch bumped but no table committed epoch is changed"
492 );
493 }
494 }
495
496 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
498 let mut is_applied_l0_compact = false;
499 for group_delta in &group_deltas.group_deltas {
500 match group_delta {
501 GroupDeltaCommon::GroupConstruct(group_construct) => {
502 let mut new_levels = build_initial_compaction_group_levels(
503 *compaction_group_id,
504 group_construct.get_group_config().unwrap(),
505 );
506 let parent_group_id = group_construct.parent_group_id;
507 new_levels.parent_group_id = parent_group_id;
508 #[expect(deprecated)]
509 new_levels
511 .member_table_ids
512 .clone_from(&group_construct.table_ids);
513 self.levels.insert(*compaction_group_id, new_levels);
514 let member_table_ids = if group_construct.version()
515 >= CompatibilityVersion::NoMemberTableIds
516 {
517 self.state_table_info
518 .compaction_group_member_table_ids(*compaction_group_id)
519 .iter()
520 .copied()
521 .collect()
522 } else {
523 #[expect(deprecated)]
524 BTreeSet::from_iter(
526 group_construct.table_ids.iter().copied().map(Into::into),
527 )
528 };
529
530 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
531 let split_key = if group_construct.split_key.is_some() {
532 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
533 } else {
534 None
535 };
536 self.init_with_parent_group_v2(
537 parent_group_id,
538 *compaction_group_id,
539 group_construct.get_new_sst_start_id().into(),
540 split_key.clone(),
541 );
542 } else {
543 self.init_with_parent_group(
545 parent_group_id,
546 *compaction_group_id,
547 member_table_ids,
548 group_construct.get_new_sst_start_id().into(),
549 );
550 }
551 }
552 GroupDeltaCommon::GroupMerge(group_merge) => {
553 tracing::info!(
554 "group_merge left {:?} right {:?}",
555 group_merge.left_group_id,
556 group_merge.right_group_id
557 );
558 self.merge_compaction_group(
559 group_merge.left_group_id,
560 group_merge.right_group_id,
561 )
562 }
563 GroupDeltaCommon::IntraLevel(level_delta) => {
564 let levels =
565 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
566 panic!("compaction group {} does not exist", compaction_group_id)
567 });
568 if is_commit_epoch {
569 assert!(
570 level_delta.removed_table_ids.is_empty(),
571 "no sst should be deleted when committing an epoch"
572 );
573
574 let IntraLevelDelta {
575 level_idx,
576 l0_sub_level_id,
577 inserted_table_infos,
578 ..
579 } = level_delta;
580 {
581 assert_eq!(
582 *level_idx, 0,
583 "we should only add to L0 when we commit an epoch."
584 );
585 if !inserted_table_infos.is_empty() {
586 insert_new_sub_level(
587 &mut levels.l0,
588 *l0_sub_level_id,
589 PbLevelType::Overlapping,
590 inserted_table_infos.clone(),
591 None,
592 );
593 }
594 }
595 } else {
596 levels.apply_compact_ssts(
598 level_delta,
599 self.state_table_info
600 .compaction_group_member_table_ids(*compaction_group_id),
601 );
602 if level_delta.level_idx == 0 {
603 is_applied_l0_compact = true;
604 }
605 }
606 }
607 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
608 let levels =
609 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
610 panic!("compaction group {} does not exist", compaction_group_id)
611 });
612 assert!(is_commit_epoch);
613
614 if !inserted_table_infos.is_empty() {
615 let next_l0_sub_level_id = levels
616 .l0
617 .sub_levels
618 .last()
619 .map(|level| level.sub_level_id + 1)
620 .unwrap_or(1);
621
622 insert_new_sub_level(
623 &mut levels.l0,
624 next_l0_sub_level_id,
625 PbLevelType::Overlapping,
626 inserted_table_infos.clone(),
627 None,
628 );
629 }
630 }
631 GroupDeltaCommon::GroupDestroy(_) => {
632 self.levels.remove(compaction_group_id);
633 }
634
635 GroupDeltaCommon::TruncateTables(table_ids) => {
636 let levels =
638 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
639 panic!("compaction group {} does not exist", compaction_group_id)
640 });
641
642 for sub_level in &mut levels.l0.sub_levels {
643 sub_level.table_infos.iter_mut().for_each(|sstable_info| {
644 let mut inner = sstable_info.get_inner();
645 inner.table_ids.retain(|id| !table_ids.contains(id));
646 sstable_info.set_inner(inner);
647 });
648 }
649
650 for level in &mut levels.levels {
651 level.table_infos.iter_mut().for_each(|sstable_info| {
652 let mut inner = sstable_info.get_inner();
653 inner.table_ids.retain(|id| !table_ids.contains(id));
654 sstable_info.set_inner(inner);
655 });
656 }
657
658 levels.compaction_group_version_id += 1;
659 }
660 }
661 }
662
663 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
664 {
665 levels.post_apply_l0_compact();
666 }
667 }
668 self.id = version_delta.id;
669 #[expect(deprecated)]
670 {
671 self.max_committed_epoch = version_delta.max_committed_epoch;
672 }
673
674 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
678 HashMap::new();
679
680 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
682 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
683 if version_delta.removed_table_ids.contains(table_id) {
684 modified_table_watermarks.insert(*table_id, None);
685 } else {
686 let mut current_table_watermarks = (**current_table_watermarks).clone();
687 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
688 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
689 }
690 } else {
691 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
692 }
693 }
694 for (table_id, table_watermarks) in &self.table_watermarks {
695 let safe_epoch = if let Some(state_table_info) =
696 self.state_table_info.info().get(table_id)
697 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
698 && state_table_info.committed_epoch > *oldest_epoch
699 {
700 state_table_info.committed_epoch
702 } else {
703 continue;
705 };
706 let table_watermarks = modified_table_watermarks
707 .entry(*table_id)
708 .or_insert_with(|| Some((**table_watermarks).clone()));
709 if let Some(table_watermarks) = table_watermarks {
710 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
711 }
712 }
713 for (table_id, table_watermarks) in modified_table_watermarks {
715 if let Some(table_watermarks) = table_watermarks {
716 self.table_watermarks
717 .insert(table_id, Arc::new(table_watermarks));
718 } else {
719 self.table_watermarks.remove(&table_id);
720 }
721 }
722
723 Self::apply_change_log_delta(
725 &mut self.table_change_log,
726 &version_delta.change_log_delta,
727 &version_delta.removed_table_ids,
728 &version_delta.state_table_info_delta,
729 &changed_table_info,
730 );
731
732 apply_vector_index_delta(
734 &mut self.vector_indexes,
735 &version_delta.vector_index_delta,
736 &version_delta.removed_table_ids,
737 );
738 }
739
740 pub fn apply_change_log_delta<T: Clone>(
741 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
742 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
743 removed_table_ids: &HashSet<TableId>,
744 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
745 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
746 ) {
747 for (table_id, change_log_delta) in change_log_delta {
748 let new_change_log = &change_log_delta.new_log;
749 match table_change_log.entry(*table_id) {
750 Entry::Occupied(entry) => {
751 let change_log = entry.into_mut();
752 change_log.add_change_log(new_change_log.clone());
753 }
754 Entry::Vacant(entry) => {
755 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
756 }
757 };
758 }
759
760 table_change_log.retain(|table_id, _| {
764 if removed_table_ids.contains(table_id) {
765 return false;
766 }
767 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
768 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
769 } else {
771 return true;
773 }
774 let contains = change_log_delta.contains_key(table_id);
775 if !contains {
776 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
777 LazyLock::new(|| LogSuppressor::per_second(1));
778 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
779 warn!(
780 suppressed_count,
781 %table_id,
782 "table change log dropped due to no further change log at newly committed epoch"
783 );
784 }
785 }
786 contains
787 });
788
789 for (table_id, change_log_delta) in change_log_delta {
791 if let Some(change_log) = table_change_log.get_mut(table_id) {
792 change_log.truncate(change_log_delta.truncate_epoch);
793 }
794 }
795 }
796
797 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
798 let mut ret: BTreeMap<_, _> = BTreeMap::new();
799 for (compaction_group_id, group) in &self.levels {
800 let mut levels = vec![];
801 levels.extend(group.l0.sub_levels.iter());
802 levels.extend(group.levels.iter());
803 for level in levels {
804 for table_info in &level.table_infos {
805 if table_info.sst_id.inner() == table_info.object_id.inner() {
806 continue;
807 }
808 let object_id = table_info.object_id;
809 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
810 entry
811 .entry(*compaction_group_id)
812 .or_default()
813 .push(table_info.sst_id)
814 }
815 }
816 }
817 ret
818 }
819
820 pub fn merge_compaction_group(
821 &mut self,
822 left_group_id: CompactionGroupId,
823 right_group_id: CompactionGroupId,
824 ) {
825 let left_group_id_table_ids = self
827 .state_table_info
828 .compaction_group_member_table_ids(left_group_id)
829 .iter();
830 let right_group_id_table_ids = self
831 .state_table_info
832 .compaction_group_member_table_ids(right_group_id)
833 .iter();
834
835 assert!(
836 left_group_id_table_ids
837 .chain(right_group_id_table_ids)
838 .is_sorted()
839 );
840
841 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
842 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
843 panic!(
844 "compaction group should exist right {} all {:?}",
845 right_group_id, total_cg
846 )
847 });
848
849 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
850 panic!(
851 "compaction group should exist left {} all {:?}",
852 left_group_id, total_cg
853 )
854 });
855
856 group_split::merge_levels(left_levels, right_levels);
857 }
858
859 pub fn init_with_parent_group_v2(
860 &mut self,
861 parent_group_id: CompactionGroupId,
862 group_id: CompactionGroupId,
863 new_sst_start_id: HummockSstableId,
864 split_key: Option<Bytes>,
865 ) {
866 let mut new_sst_id = new_sst_start_id;
867 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
868 if new_sst_start_id != 0 {
869 if cfg!(debug_assertions) {
870 panic!(
871 "non-zero sst start id {} for NewCompactionGroup",
872 new_sst_start_id
873 );
874 } else {
875 warn!(
876 %new_sst_start_id,
877 "non-zero sst start id for NewCompactionGroup"
878 );
879 }
880 }
881 return;
882 } else if !self.levels.contains_key(&parent_group_id) {
883 unreachable!(
884 "non-existing parent group id {} to init from (V2)",
885 parent_group_id
886 );
887 }
888
889 let [parent_levels, cur_levels] = self
890 .levels
891 .get_disjoint_mut([&parent_group_id, &group_id])
892 .map(|res| res.unwrap());
893 parent_levels.compaction_group_version_id += 1;
896 cur_levels.compaction_group_version_id += 1;
897
898 let l0 = &mut parent_levels.l0;
899 {
900 for sub_level in &mut l0.sub_levels {
901 let target_l0 = &mut cur_levels.l0;
902 let insert_table_infos = if let Some(split_key) = &split_key {
905 group_split::split_sst_info_for_level_v2(
906 sub_level,
907 &mut new_sst_id,
908 split_key.clone(),
909 )
910 } else {
911 vec![]
912 };
913
914 if insert_table_infos.is_empty() {
915 continue;
916 }
917
918 sub_level
919 .table_infos
920 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
921 .for_each(|sst_info| {
922 sub_level.total_file_size -= sst_info.sst_size;
923 sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
924 l0.total_file_size -= sst_info.sst_size;
925 l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
926 });
927 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
928 Ok(idx) => {
929 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
930 }
931 Err(idx) => {
932 insert_new_sub_level(
933 target_l0,
934 sub_level.sub_level_id,
935 sub_level.level_type,
936 insert_table_infos,
937 Some(idx),
938 );
939 }
940 }
941 }
942
943 l0.sub_levels.retain(|level| !level.table_infos.is_empty());
944 }
945
946 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
947 let insert_table_infos = if let Some(split_key) = &split_key {
948 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
949 } else {
950 vec![]
951 };
952
953 if insert_table_infos.is_empty() {
954 continue;
955 }
956
957 cur_levels.levels[idx].total_file_size += insert_table_infos
958 .iter()
959 .map(|sst| sst.sst_size)
960 .sum::<u64>();
961 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
962 .iter()
963 .map(|sst| sst.uncompressed_file_size)
964 .sum::<u64>();
965 cur_levels.levels[idx]
966 .table_infos
967 .extend(insert_table_infos);
968 cur_levels.levels[idx]
969 .table_infos
970 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
971 assert!(can_concat(&cur_levels.levels[idx].table_infos));
972 level
973 .table_infos
974 .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
975 .for_each(|sst_info| {
976 level.total_file_size -= sst_info.sst_size;
977 level.uncompressed_file_size -= sst_info.uncompressed_file_size;
978 });
979 }
980
981 assert!(
982 parent_levels
983 .l0
984 .sub_levels
985 .iter()
986 .all(|level| !level.table_infos.is_empty())
987 );
988 assert!(
989 cur_levels
990 .l0
991 .sub_levels
992 .iter()
993 .all(|level| !level.table_infos.is_empty())
994 );
995 }
996}
997
998impl<T> HummockVersionCommon<T>
999where
1000 T: SstableIdReader + ObjectIdReader,
1001{
1002 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
1003 self.get_sst_infos(exclude_change_log)
1004 .map(|s| s.object_id())
1005 .collect()
1006 }
1007
1008 pub fn get_object_ids(
1009 &self,
1010 exclude_change_log: bool,
1011 ) -> impl Iterator<Item = HummockObjectId> + '_ {
1012 match HummockObjectId::Sstable(0.into()) {
1016 HummockObjectId::Sstable(_) => {}
1017 HummockObjectId::VectorFile(_) => {}
1018 HummockObjectId::HnswGraphFile(_) => {}
1019 };
1020 self.get_sst_infos(exclude_change_log)
1021 .map(|s| HummockObjectId::Sstable(s.object_id()))
1022 .chain(
1023 self.vector_indexes
1024 .values()
1025 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1026 )
1027 }
1028
1029 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1030 self.get_sst_infos(exclude_change_log)
1031 .map(|s| s.sst_id())
1032 .collect()
1033 }
1034
1035 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1036 let may_table_change_log = if exclude_change_log {
1037 None
1038 } else {
1039 Some(self.table_change_log.values())
1040 };
1041 self.get_combined_levels()
1042 .flat_map(|level| level.table_infos.iter())
1043 .chain(
1044 may_table_change_log
1045 .map(|v| {
1046 v.flat_map(|table_change_log| {
1047 table_change_log.iter().flat_map(|epoch_change_log| {
1048 epoch_change_log
1049 .old_value
1050 .iter()
1051 .chain(epoch_change_log.new_value.iter())
1052 })
1053 })
1054 })
1055 .into_iter()
1056 .flatten(),
1057 )
1058 }
1059}
1060
1061impl Levels {
1062 pub(crate) fn apply_compact_ssts(
1063 &mut self,
1064 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1065 member_table_ids: &BTreeSet<TableId>,
1066 ) {
1067 let IntraLevelDeltaCommon {
1068 level_idx,
1069 l0_sub_level_id,
1070 inserted_table_infos: insert_table_infos,
1071 vnode_partition_count,
1072 removed_table_ids: delete_sst_ids_set,
1073 compaction_group_version_id,
1074 } = level_delta;
1075 let new_vnode_partition_count = *vnode_partition_count;
1076
1077 if is_compaction_task_expired(
1078 self.compaction_group_version_id,
1079 *compaction_group_version_id,
1080 ) {
1081 warn!(
1082 current_compaction_group_version_id = self.compaction_group_version_id,
1083 delta_compaction_group_version_id = compaction_group_version_id,
1084 level_idx,
1085 l0_sub_level_id,
1086 insert_table_infos = ?insert_table_infos
1087 .iter()
1088 .map(|sst| (sst.sst_id, sst.object_id))
1089 .collect_vec(),
1090 ?delete_sst_ids_set,
1091 "This VersionDelta may be committed by an expired compact task. Please check it."
1092 );
1093 return;
1094 }
1095 if !delete_sst_ids_set.is_empty() {
1096 if *level_idx == 0 {
1097 for level in &mut self.l0.sub_levels {
1098 level_delete_ssts(level, delete_sst_ids_set);
1099 }
1100 } else {
1101 let idx = *level_idx as usize - 1;
1102 level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1103 }
1104 }
1105
1106 if !insert_table_infos.is_empty() {
1107 let insert_sst_level_id = *level_idx;
1108 let insert_sub_level_id = *l0_sub_level_id;
1109 if insert_sst_level_id == 0 {
1110 let l0 = &mut self.l0;
1111 let index = l0
1112 .sub_levels
1113 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1114 assert!(
1115 index < l0.sub_levels.len()
1116 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1117 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1118 insert_sub_level_id,
1119 delete_sst_ids_set,
1120 l0.sub_levels
1121 .iter()
1122 .map(|level| level.sub_level_id)
1123 .collect_vec()
1124 );
1125 if l0.sub_levels[index].table_infos.is_empty()
1126 && member_table_ids.len() == 1
1127 && insert_table_infos.iter().all(|sst| {
1128 sst.table_ids.len() == 1
1129 && sst.table_ids[0]
1130 == *member_table_ids.iter().next().expect("non-empty")
1131 })
1132 {
1133 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1136 }
1137 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1138 } else {
1139 let idx = insert_sst_level_id as usize - 1;
1140 if self.levels[idx].table_infos.is_empty()
1141 && insert_table_infos
1142 .iter()
1143 .all(|sst| sst.table_ids.len() == 1)
1144 {
1145 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1146 } else if self.levels[idx].vnode_partition_count != 0
1147 && new_vnode_partition_count == 0
1148 && member_table_ids.len() > 1
1149 {
1150 self.levels[idx].vnode_partition_count = 0;
1151 }
1152 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1153 }
1154 }
1155 }
1156
1157 pub(crate) fn post_apply_l0_compact(&mut self) {
1158 {
1159 self.l0
1160 .sub_levels
1161 .retain(|level| !level.table_infos.is_empty());
1162 self.l0.total_file_size = self
1163 .l0
1164 .sub_levels
1165 .iter()
1166 .map(|level| level.total_file_size)
1167 .sum::<u64>();
1168 self.l0.uncompressed_file_size = self
1169 .l0
1170 .sub_levels
1171 .iter()
1172 .map(|level| level.uncompressed_file_size)
1173 .sum::<u64>();
1174 }
1175 }
1176}
1177
1178impl<T, L> HummockVersionCommon<T, L> {
1179 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1180 self.levels
1181 .values()
1182 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1183 }
1184}
1185
1186pub fn build_initial_compaction_group_levels(
1187 group_id: CompactionGroupId,
1188 compaction_config: &CompactionConfig,
1189) -> Levels {
1190 let mut levels = vec![];
1191 for l in 0..compaction_config.get_max_level() {
1192 levels.push(Level {
1193 level_idx: (l + 1) as u32,
1194 level_type: PbLevelType::Nonoverlapping,
1195 table_infos: vec![],
1196 total_file_size: 0,
1197 sub_level_id: 0,
1198 uncompressed_file_size: 0,
1199 vnode_partition_count: 0,
1200 });
1201 }
1202 #[expect(deprecated)] Levels {
1204 levels,
1205 l0: OverlappingLevel {
1206 sub_levels: vec![],
1207 total_file_size: 0,
1208 uncompressed_file_size: 0,
1209 },
1210 group_id,
1211 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1212 member_table_ids: vec![],
1213 compaction_group_version_id: 0,
1214 }
1215}
1216
1217fn split_sst_info_for_level(
1218 member_table_ids: &BTreeSet<TableId>,
1219 level: &mut Level,
1220 new_sst_id: &mut HummockSstableId,
1221) -> Vec<SstableInfo> {
1222 let mut insert_table_infos = vec![];
1225 for sst_info in &mut level.table_infos {
1226 let removed_table_ids = sst_info
1227 .table_ids
1228 .iter()
1229 .filter(|table_id| member_table_ids.contains(table_id))
1230 .cloned()
1231 .collect_vec();
1232 let sst_size = sst_info.sst_size;
1233 if sst_size / 2 == 0 {
1234 tracing::warn!(
1235 id = %sst_info.sst_id,
1236 object_id = %sst_info.object_id,
1237 sst_size = sst_info.sst_size,
1238 file_size = sst_info.file_size,
1239 "Sstable sst_size is under expected",
1240 );
1241 };
1242 if !removed_table_ids.is_empty() {
1243 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1244 sst_info,
1245 new_sst_id,
1246 sst_size / 2,
1247 sst_size / 2,
1248 member_table_ids.iter().cloned().collect_vec(),
1249 );
1250 *sst_info = modified_sst;
1251 insert_table_infos.push(branch_sst);
1252 }
1253 }
1254 insert_table_infos
1255}
1256
1257pub fn get_compaction_group_ids(
1259 version: &HummockVersion,
1260) -> impl Iterator<Item = CompactionGroupId> + '_ {
1261 version.levels.keys().cloned()
1262}
1263
1264pub fn get_table_compaction_group_id_mapping(
1265 version: &HummockVersion,
1266) -> HashMap<StateTableId, CompactionGroupId> {
1267 version
1268 .state_table_info
1269 .info()
1270 .iter()
1271 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1272 .collect()
1273}
1274
1275pub fn get_compaction_group_ssts(
1277 version: &HummockVersion,
1278 group_id: CompactionGroupId,
1279) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1280 let group_levels = version.get_compaction_group_levels(group_id);
1281 group_levels
1282 .l0
1283 .sub_levels
1284 .iter()
1285 .rev()
1286 .chain(group_levels.levels.iter())
1287 .flat_map(|level| {
1288 level
1289 .table_infos
1290 .iter()
1291 .map(|table_info| (table_info.object_id, table_info.sst_id))
1292 })
1293}
1294
1295pub fn new_sub_level(
1296 sub_level_id: u64,
1297 level_type: PbLevelType,
1298 table_infos: Vec<SstableInfo>,
1299) -> Level {
1300 if level_type == PbLevelType::Nonoverlapping {
1301 debug_assert!(
1302 can_concat(&table_infos),
1303 "sst of non-overlapping level is not concat-able: {:?}",
1304 table_infos
1305 );
1306 }
1307 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1308 let uncompressed_file_size = table_infos
1309 .iter()
1310 .map(|table| table.uncompressed_file_size)
1311 .sum();
1312 Level {
1313 level_idx: 0,
1314 level_type,
1315 table_infos,
1316 total_file_size,
1317 sub_level_id,
1318 uncompressed_file_size,
1319 vnode_partition_count: 0,
1320 }
1321}
1322
1323pub fn add_ssts_to_sub_level(
1324 l0: &mut OverlappingLevel,
1325 sub_level_idx: usize,
1326 insert_table_infos: Vec<SstableInfo>,
1327) {
1328 insert_table_infos.iter().for_each(|sst| {
1329 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1330 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1331 l0.total_file_size += sst.sst_size;
1332 l0.uncompressed_file_size += sst.uncompressed_file_size;
1333 });
1334 l0.sub_levels[sub_level_idx]
1335 .table_infos
1336 .extend(insert_table_infos);
1337 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1338 l0.sub_levels[sub_level_idx]
1339 .table_infos
1340 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1341 assert!(
1342 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1343 "sstable ids: {:?}",
1344 l0.sub_levels[sub_level_idx]
1345 .table_infos
1346 .iter()
1347 .map(|sst| sst.sst_id)
1348 .collect_vec()
1349 );
1350 }
1351}
1352
1353pub fn insert_new_sub_level(
1355 l0: &mut OverlappingLevel,
1356 insert_sub_level_id: u64,
1357 level_type: PbLevelType,
1358 insert_table_infos: Vec<SstableInfo>,
1359 sub_level_insert_hint: Option<usize>,
1360) {
1361 if insert_sub_level_id == u64::MAX {
1362 return;
1363 }
1364 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1365 insert_pos
1366 } else {
1367 if let Some(newest_level) = l0.sub_levels.last() {
1368 assert!(
1369 newest_level.sub_level_id < insert_sub_level_id,
1370 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1371 newest_level.sub_level_id,
1372 insert_sub_level_id,
1373 l0,
1374 );
1375 }
1376 l0.sub_levels.len()
1377 };
1378 #[cfg(debug_assertions)]
1379 {
1380 if insert_pos > 0
1381 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1382 {
1383 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1384 }
1385 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1386 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1387 }
1388 }
1389 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1392 l0.total_file_size += level.total_file_size;
1393 l0.uncompressed_file_size += level.uncompressed_file_size;
1394 l0.sub_levels.insert(insert_pos, level);
1395}
1396
1397fn level_delete_ssts(
1401 operand: &mut Level,
1402 delete_sst_ids_superset: &HashSet<HummockSstableId>,
1403) -> bool {
1404 let original_len = operand.table_infos.len();
1405 operand
1406 .table_infos
1407 .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1408 operand.total_file_size = operand
1409 .table_infos
1410 .iter()
1411 .map(|table| table.sst_size)
1412 .sum::<u64>();
1413 operand.uncompressed_file_size = operand
1414 .table_infos
1415 .iter()
1416 .map(|table| table.uncompressed_file_size)
1417 .sum::<u64>();
1418 original_len != operand.table_infos.len()
1419}
1420
1421fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1422 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1423 format!(
1424 "sstable ids: {:?}",
1425 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1426 )
1427 }
1428 operand.total_file_size += insert_table_infos
1429 .iter()
1430 .map(|sst| sst.sst_size)
1431 .sum::<u64>();
1432 operand.uncompressed_file_size += insert_table_infos
1433 .iter()
1434 .map(|sst| sst.uncompressed_file_size)
1435 .sum::<u64>();
1436 if operand.level_type == PbLevelType::Overlapping {
1437 operand.level_type = PbLevelType::Nonoverlapping;
1438 operand
1439 .table_infos
1440 .extend(insert_table_infos.iter().cloned());
1441 operand
1442 .table_infos
1443 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1444 assert!(
1445 can_concat(&operand.table_infos),
1446 "{}",
1447 display_sstable_infos(&operand.table_infos)
1448 );
1449 } else if !insert_table_infos.is_empty() {
1450 let sorted_insert: Vec<_> = insert_table_infos
1451 .iter()
1452 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1453 .cloned()
1454 .collect();
1455 let first = &sorted_insert[0];
1456 let last = &sorted_insert[sorted_insert.len() - 1];
1457 let pos = operand
1458 .table_infos
1459 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1460 if pos >= operand.table_infos.len()
1461 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1462 {
1463 operand.table_infos.splice(pos..pos, sorted_insert);
1464 let validate_range = operand
1466 .table_infos
1467 .iter()
1468 .skip(pos.saturating_sub(1))
1469 .take(insert_table_infos.len() + 2)
1470 .collect_vec();
1471 assert!(
1472 can_concat(&validate_range),
1473 "{}",
1474 display_sstable_infos(&validate_range),
1475 );
1476 } else {
1477 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1480 for i in insert_table_infos {
1481 let pos = operand
1482 .table_infos
1483 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1484 operand.table_infos.insert(pos, i.clone());
1485 }
1486 assert!(
1487 can_concat(&operand.table_infos),
1488 "{}",
1489 display_sstable_infos(&operand.table_infos)
1490 );
1491 }
1492 }
1493}
1494
1495pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1496 match HummockObjectId::Sstable(0.into()) {
1500 HummockObjectId::Sstable(_) => {}
1501 HummockObjectId::VectorFile(_) => {}
1502 HummockObjectId::HnswGraphFile(_) => {}
1503 };
1504 version
1505 .levels
1506 .values()
1507 .flat_map(|cg| {
1508 cg.level0()
1509 .sub_levels
1510 .iter()
1511 .chain(cg.levels.iter())
1512 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1513 })
1514 .chain(version.table_change_log.values().flat_map(|c| {
1515 c.iter().flat_map(|l| {
1516 l.old_value
1517 .iter()
1518 .chain(l.new_value.iter())
1519 .map(|t| (t.object_id, t.file_size))
1520 })
1521 }))
1522 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1523 .chain(
1524 version
1525 .vector_indexes
1526 .values()
1527 .flat_map(|index| index.get_objects()),
1528 )
1529 .collect()
1530}
1531
1532pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1535 let mut res = Vec::new();
1536 for (group_id, levels) in &version.levels {
1538 if levels.group_id != *group_id {
1540 res.push(format!(
1541 "GROUP {}: inconsistent group id {} in Levels",
1542 group_id, levels.group_id
1543 ));
1544 }
1545
1546 let validate_level = |group: CompactionGroupId,
1547 expected_level_idx: u32,
1548 level: &Level,
1549 res: &mut Vec<String>| {
1550 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1551 if level.level_idx == 0 {
1552 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1553 if level.table_infos.is_empty() {
1555 res.push(format!("{}: empty level", level_identifier));
1556 }
1557 } else if level.level_type != PbLevelType::Nonoverlapping {
1558 res.push(format!(
1560 "{}: level type {:?} is not non-overlapping",
1561 level_identifier, level.level_type
1562 ));
1563 }
1564
1565 if level.level_idx != expected_level_idx {
1567 res.push(format!(
1568 "{}: mismatched level idx {}",
1569 level_identifier, expected_level_idx
1570 ));
1571 }
1572
1573 let mut prev_table_info: Option<&SstableInfo> = None;
1574 for table_info in &level.table_infos {
1575 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1577 res.push(format!(
1578 "{} SST {}: table_ids not sorted",
1579 level_identifier, table_info.object_id
1580 ));
1581 }
1582
1583 if level.level_type == PbLevelType::Nonoverlapping {
1585 if let Some(prev) = prev_table_info.take()
1586 && prev
1587 .key_range
1588 .compare_right_with(&table_info.key_range.left)
1589 != Ordering::Less
1590 {
1591 res.push(format!(
1592 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1593 level_identifier, table_info.object_id, prev, table_info
1594 ));
1595 }
1596 let _ = prev_table_info.insert(table_info);
1597 }
1598 }
1599 };
1600
1601 let l0 = &levels.l0;
1602 let mut prev_sub_level_id = u64::MAX;
1603 for sub_level in &l0.sub_levels {
1604 if sub_level.sub_level_id >= prev_sub_level_id {
1606 res.push(format!(
1607 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1608 group_id, sub_level.level_idx, prev_sub_level_id
1609 ));
1610 }
1611 prev_sub_level_id = sub_level.sub_level_id;
1612
1613 validate_level(*group_id, 0, sub_level, &mut res);
1614 }
1615
1616 for idx in 1..=levels.levels.len() {
1617 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1618 }
1619 }
1620 res
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625 use std::collections::{HashMap, HashSet};
1626
1627 use bytes::Bytes;
1628 use risingwave_common::catalog::TableId;
1629 use risingwave_common::hash::VirtualNode;
1630 use risingwave_common::util::epoch::test_epoch;
1631 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1632
1633 use super::group_split;
1634 use crate::HummockVersionId;
1635 use crate::compaction_group::group_split::*;
1636 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1637 use crate::key::{FullKey, gen_key_from_str};
1638 use crate::key_range::KeyRange;
1639 use crate::level::{Level, Levels, OverlappingLevel};
1640 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1641 use crate::version::{
1642 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1643 };
1644
1645 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1646 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1647 }
1648
1649 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1650 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1651 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1652 let full_key_l = FullKey::for_test(
1653 TableId::new(*table_ids.first().unwrap()),
1654 table_key_l,
1655 epoch,
1656 )
1657 .encode();
1658 let full_key_r =
1659 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1660 .encode();
1661
1662 SstableInfoInner {
1663 sst_id: sst_id.into(),
1664 key_range: KeyRange {
1665 left: full_key_l.into(),
1666 right: full_key_r.into(),
1667 right_exclusive: false,
1668 },
1669 table_ids: table_ids.into_iter().map(Into::into).collect(),
1670 object_id: sst_id.into(),
1671 min_epoch: 20,
1672 max_epoch: 20,
1673 file_size: 100,
1674 sst_size: 100,
1675 ..Default::default()
1676 }
1677 }
1678
1679 #[test]
1680 fn test_get_sst_object_ids() {
1681 let mut version = HummockVersion {
1682 id: HummockVersionId::new(0),
1683 levels: HashMap::from_iter([(
1684 0,
1685 Levels {
1686 levels: vec![],
1687 l0: OverlappingLevel {
1688 sub_levels: vec![],
1689 total_file_size: 0,
1690 uncompressed_file_size: 0,
1691 },
1692 ..Default::default()
1693 },
1694 )]),
1695 ..Default::default()
1696 };
1697 assert_eq!(version.get_object_ids(false).count(), 0);
1698
1699 version
1701 .levels
1702 .get_mut(&0)
1703 .unwrap()
1704 .l0
1705 .sub_levels
1706 .push(Level {
1707 table_infos: vec![
1708 SstableInfoInner {
1709 object_id: 11.into(),
1710 sst_id: 11.into(),
1711 ..Default::default()
1712 }
1713 .into(),
1714 ],
1715 ..Default::default()
1716 });
1717 assert_eq!(version.get_object_ids(false).count(), 1);
1718
1719 version.levels.get_mut(&0).unwrap().levels.push(Level {
1721 table_infos: vec![
1722 SstableInfoInner {
1723 object_id: 22.into(),
1724 sst_id: 22.into(),
1725 ..Default::default()
1726 }
1727 .into(),
1728 ],
1729 ..Default::default()
1730 });
1731 assert_eq!(version.get_object_ids(false).count(), 2);
1732 }
1733
1734 #[test]
1735 fn test_apply_version_delta() {
1736 let mut version = HummockVersion {
1737 id: HummockVersionId::new(0),
1738 levels: HashMap::from_iter([
1739 (
1740 0,
1741 build_initial_compaction_group_levels(
1742 0,
1743 &CompactionConfig {
1744 max_level: 6,
1745 ..Default::default()
1746 },
1747 ),
1748 ),
1749 (
1750 1,
1751 build_initial_compaction_group_levels(
1752 1,
1753 &CompactionConfig {
1754 max_level: 6,
1755 ..Default::default()
1756 },
1757 ),
1758 ),
1759 ]),
1760 ..Default::default()
1761 };
1762 let version_delta = HummockVersionDelta {
1763 id: HummockVersionId::new(1),
1764 group_deltas: HashMap::from_iter([
1765 (
1766 2,
1767 GroupDeltas {
1768 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1769 group_config: Some(CompactionConfig {
1770 max_level: 6,
1771 ..Default::default()
1772 }),
1773 ..Default::default()
1774 }))],
1775 },
1776 ),
1777 (
1778 0,
1779 GroupDeltas {
1780 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1781 },
1782 ),
1783 (
1784 1,
1785 GroupDeltas {
1786 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1787 1,
1788 0,
1789 HashSet::new(),
1790 vec![
1791 SstableInfoInner {
1792 object_id: 1.into(),
1793 sst_id: 1.into(),
1794 ..Default::default()
1795 }
1796 .into(),
1797 ],
1798 0,
1799 version
1800 .levels
1801 .get(&1)
1802 .as_ref()
1803 .unwrap()
1804 .compaction_group_version_id,
1805 ))],
1806 },
1807 ),
1808 ]),
1809 ..Default::default()
1810 };
1811 let version_delta = version_delta;
1812
1813 version.apply_version_delta(&version_delta);
1814 let mut cg1 = build_initial_compaction_group_levels(
1815 1,
1816 &CompactionConfig {
1817 max_level: 6,
1818 ..Default::default()
1819 },
1820 );
1821 cg1.levels[0] = Level {
1822 level_idx: 1,
1823 level_type: LevelType::Nonoverlapping,
1824 table_infos: vec![
1825 SstableInfoInner {
1826 object_id: 1.into(),
1827 sst_id: 1.into(),
1828 ..Default::default()
1829 }
1830 .into(),
1831 ],
1832 ..Default::default()
1833 };
1834 assert_eq!(
1835 version,
1836 HummockVersion {
1837 id: HummockVersionId::new(1),
1838 levels: HashMap::from_iter([
1839 (
1840 2,
1841 build_initial_compaction_group_levels(
1842 2,
1843 &CompactionConfig {
1844 max_level: 6,
1845 ..Default::default()
1846 },
1847 ),
1848 ),
1849 (1, cg1),
1850 ]),
1851 ..Default::default()
1852 }
1853 );
1854 }
1855
1856 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1857 gen_sst_info_impl(object_id, table_ids, left, right).into()
1858 }
1859
1860 fn gen_sst_info_impl(
1861 object_id: u64,
1862 table_ids: Vec<u32>,
1863 left: Bytes,
1864 right: Bytes,
1865 ) -> SstableInfoInner {
1866 SstableInfoInner {
1867 object_id: object_id.into(),
1868 sst_id: object_id.into(),
1869 key_range: KeyRange {
1870 left,
1871 right,
1872 right_exclusive: false,
1873 },
1874 table_ids: table_ids.into_iter().map(Into::into).collect(),
1875 file_size: 100,
1876 sst_size: 100,
1877 uncompressed_file_size: 100,
1878 ..Default::default()
1879 }
1880 }
1881
1882 #[test]
1883 fn test_merge_levels() {
1884 let mut left_levels = build_initial_compaction_group_levels(
1885 1,
1886 &CompactionConfig {
1887 max_level: 6,
1888 ..Default::default()
1889 },
1890 );
1891
1892 let mut right_levels = build_initial_compaction_group_levels(
1893 2,
1894 &CompactionConfig {
1895 max_level: 6,
1896 ..Default::default()
1897 },
1898 );
1899
1900 left_levels.levels[0] = Level {
1901 level_idx: 1,
1902 level_type: LevelType::Nonoverlapping,
1903 table_infos: vec![
1904 gen_sst_info(
1905 1,
1906 vec![3],
1907 FullKey::for_test(
1908 TableId::new(3),
1909 gen_key_from_str(VirtualNode::from_index(1), "1"),
1910 0,
1911 )
1912 .encode()
1913 .into(),
1914 FullKey::for_test(
1915 TableId::new(3),
1916 gen_key_from_str(VirtualNode::from_index(200), "1"),
1917 0,
1918 )
1919 .encode()
1920 .into(),
1921 ),
1922 gen_sst_info(
1923 10,
1924 vec![3, 4],
1925 FullKey::for_test(
1926 TableId::new(3),
1927 gen_key_from_str(VirtualNode::from_index(201), "1"),
1928 0,
1929 )
1930 .encode()
1931 .into(),
1932 FullKey::for_test(
1933 TableId::new(4),
1934 gen_key_from_str(VirtualNode::from_index(10), "1"),
1935 0,
1936 )
1937 .encode()
1938 .into(),
1939 ),
1940 gen_sst_info(
1941 11,
1942 vec![4],
1943 FullKey::for_test(
1944 TableId::new(4),
1945 gen_key_from_str(VirtualNode::from_index(11), "1"),
1946 0,
1947 )
1948 .encode()
1949 .into(),
1950 FullKey::for_test(
1951 TableId::new(4),
1952 gen_key_from_str(VirtualNode::from_index(200), "1"),
1953 0,
1954 )
1955 .encode()
1956 .into(),
1957 ),
1958 ],
1959 total_file_size: 300,
1960 ..Default::default()
1961 };
1962
1963 left_levels.l0.sub_levels.push(Level {
1964 level_idx: 0,
1965 table_infos: vec![gen_sst_info(
1966 3,
1967 vec![3],
1968 FullKey::for_test(
1969 TableId::new(3),
1970 gen_key_from_str(VirtualNode::from_index(1), "1"),
1971 0,
1972 )
1973 .encode()
1974 .into(),
1975 FullKey::for_test(
1976 TableId::new(3),
1977 gen_key_from_str(VirtualNode::from_index(200), "1"),
1978 0,
1979 )
1980 .encode()
1981 .into(),
1982 )],
1983 sub_level_id: 101,
1984 level_type: LevelType::Overlapping,
1985 total_file_size: 100,
1986 ..Default::default()
1987 });
1988
1989 left_levels.l0.sub_levels.push(Level {
1990 level_idx: 0,
1991 table_infos: vec![gen_sst_info(
1992 3,
1993 vec![3],
1994 FullKey::for_test(
1995 TableId::new(3),
1996 gen_key_from_str(VirtualNode::from_index(1), "1"),
1997 0,
1998 )
1999 .encode()
2000 .into(),
2001 FullKey::for_test(
2002 TableId::new(3),
2003 gen_key_from_str(VirtualNode::from_index(200), "1"),
2004 0,
2005 )
2006 .encode()
2007 .into(),
2008 )],
2009 sub_level_id: 103,
2010 level_type: LevelType::Overlapping,
2011 total_file_size: 100,
2012 ..Default::default()
2013 });
2014
2015 left_levels.l0.sub_levels.push(Level {
2016 level_idx: 0,
2017 table_infos: vec![gen_sst_info(
2018 3,
2019 vec![3],
2020 FullKey::for_test(
2021 TableId::new(3),
2022 gen_key_from_str(VirtualNode::from_index(1), "1"),
2023 0,
2024 )
2025 .encode()
2026 .into(),
2027 FullKey::for_test(
2028 TableId::new(3),
2029 gen_key_from_str(VirtualNode::from_index(200), "1"),
2030 0,
2031 )
2032 .encode()
2033 .into(),
2034 )],
2035 sub_level_id: 105,
2036 level_type: LevelType::Nonoverlapping,
2037 total_file_size: 100,
2038 ..Default::default()
2039 });
2040
2041 right_levels.levels[0] = Level {
2042 level_idx: 1,
2043 level_type: LevelType::Nonoverlapping,
2044 table_infos: vec![
2045 gen_sst_info(
2046 1,
2047 vec![5],
2048 FullKey::for_test(
2049 TableId::new(5),
2050 gen_key_from_str(VirtualNode::from_index(1), "1"),
2051 0,
2052 )
2053 .encode()
2054 .into(),
2055 FullKey::for_test(
2056 TableId::new(5),
2057 gen_key_from_str(VirtualNode::from_index(200), "1"),
2058 0,
2059 )
2060 .encode()
2061 .into(),
2062 ),
2063 gen_sst_info(
2064 10,
2065 vec![5, 6],
2066 FullKey::for_test(
2067 TableId::new(5),
2068 gen_key_from_str(VirtualNode::from_index(201), "1"),
2069 0,
2070 )
2071 .encode()
2072 .into(),
2073 FullKey::for_test(
2074 TableId::new(6),
2075 gen_key_from_str(VirtualNode::from_index(10), "1"),
2076 0,
2077 )
2078 .encode()
2079 .into(),
2080 ),
2081 gen_sst_info(
2082 11,
2083 vec![6],
2084 FullKey::for_test(
2085 TableId::new(6),
2086 gen_key_from_str(VirtualNode::from_index(11), "1"),
2087 0,
2088 )
2089 .encode()
2090 .into(),
2091 FullKey::for_test(
2092 TableId::new(6),
2093 gen_key_from_str(VirtualNode::from_index(200), "1"),
2094 0,
2095 )
2096 .encode()
2097 .into(),
2098 ),
2099 ],
2100 total_file_size: 300,
2101 ..Default::default()
2102 };
2103
2104 right_levels.l0.sub_levels.push(Level {
2105 level_idx: 0,
2106 table_infos: vec![gen_sst_info(
2107 3,
2108 vec![5],
2109 FullKey::for_test(
2110 TableId::new(5),
2111 gen_key_from_str(VirtualNode::from_index(1), "1"),
2112 0,
2113 )
2114 .encode()
2115 .into(),
2116 FullKey::for_test(
2117 TableId::new(5),
2118 gen_key_from_str(VirtualNode::from_index(200), "1"),
2119 0,
2120 )
2121 .encode()
2122 .into(),
2123 )],
2124 sub_level_id: 101,
2125 level_type: LevelType::Overlapping,
2126 total_file_size: 100,
2127 ..Default::default()
2128 });
2129
2130 right_levels.l0.sub_levels.push(Level {
2131 level_idx: 0,
2132 table_infos: vec![gen_sst_info(
2133 5,
2134 vec![5],
2135 FullKey::for_test(
2136 TableId::new(5),
2137 gen_key_from_str(VirtualNode::from_index(1), "1"),
2138 0,
2139 )
2140 .encode()
2141 .into(),
2142 FullKey::for_test(
2143 TableId::new(5),
2144 gen_key_from_str(VirtualNode::from_index(200), "1"),
2145 0,
2146 )
2147 .encode()
2148 .into(),
2149 )],
2150 sub_level_id: 102,
2151 level_type: LevelType::Overlapping,
2152 total_file_size: 100,
2153 ..Default::default()
2154 });
2155
2156 right_levels.l0.sub_levels.push(Level {
2157 level_idx: 0,
2158 table_infos: vec![gen_sst_info(
2159 3,
2160 vec![5],
2161 FullKey::for_test(
2162 TableId::new(5),
2163 gen_key_from_str(VirtualNode::from_index(1), "1"),
2164 0,
2165 )
2166 .encode()
2167 .into(),
2168 FullKey::for_test(
2169 TableId::new(5),
2170 gen_key_from_str(VirtualNode::from_index(200), "1"),
2171 0,
2172 )
2173 .encode()
2174 .into(),
2175 )],
2176 sub_level_id: 103,
2177 level_type: LevelType::Nonoverlapping,
2178 total_file_size: 100,
2179 ..Default::default()
2180 });
2181
2182 {
2183 let mut left_levels = Levels::default();
2185 let right_levels = Levels::default();
2186
2187 group_split::merge_levels(&mut left_levels, right_levels);
2188 }
2189
2190 {
2191 let mut left_levels = build_initial_compaction_group_levels(
2193 1,
2194 &CompactionConfig {
2195 max_level: 6,
2196 ..Default::default()
2197 },
2198 );
2199 let right_levels = right_levels.clone();
2200
2201 group_split::merge_levels(&mut left_levels, right_levels);
2202
2203 assert!(left_levels.l0.sub_levels.len() == 3);
2204 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2205 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2206 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2207 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2208 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2209 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2210
2211 assert!(left_levels.levels[0].level_idx == 1);
2212 assert_eq!(300, left_levels.levels[0].total_file_size);
2213 }
2214
2215 {
2216 let mut left_levels = left_levels.clone();
2218 let right_levels = build_initial_compaction_group_levels(
2219 2,
2220 &CompactionConfig {
2221 max_level: 6,
2222 ..Default::default()
2223 },
2224 );
2225
2226 group_split::merge_levels(&mut left_levels, right_levels);
2227
2228 assert!(left_levels.l0.sub_levels.len() == 3);
2229 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2230 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2231 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2232 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2233 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2234 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2235
2236 assert!(left_levels.levels[0].level_idx == 1);
2237 assert_eq!(300, left_levels.levels[0].total_file_size);
2238 }
2239
2240 {
2241 let mut left_levels = left_levels.clone();
2242 let right_levels = right_levels.clone();
2243
2244 group_split::merge_levels(&mut left_levels, right_levels);
2245
2246 assert!(left_levels.l0.sub_levels.len() == 6);
2247 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2248 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2249 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2250 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2251 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2252 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2253 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2254 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2255 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2256 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2257 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2258 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2259
2260 assert!(left_levels.levels[0].level_idx == 1);
2261 assert_eq!(600, left_levels.levels[0].total_file_size);
2262 }
2263 }
2264
2265 #[test]
2266 fn test_get_split_pos() {
2267 let epoch = test_epoch(1);
2268 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2269 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2270 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2271
2272 let ssts = vec![s1, s2, s3];
2273 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2274
2275 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2276 assert_eq!(1, pos);
2277
2278 let pos = group_split::get_split_pos(&vec![], split_key);
2279 assert_eq!(0, pos);
2280 }
2281
2282 #[test]
2283 fn test_split_sst() {
2284 let epoch = test_epoch(1);
2285 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2286
2287 {
2288 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2289 let origin_sst = sst.clone();
2290 let sst_size = origin_sst.sst_size;
2291 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2292 assert_eq!(SstSplitType::Both, split_type);
2293
2294 let mut new_sst_id = 10.into();
2295 let (origin_sst, branched_sst) = group_split::split_sst(
2296 origin_sst,
2297 &mut new_sst_id,
2298 split_key,
2299 sst_size / 2,
2300 sst_size / 2,
2301 );
2302
2303 let origin_sst = origin_sst.unwrap();
2304 let branched_sst = branched_sst.unwrap();
2305
2306 assert!(origin_sst.key_range.right_exclusive);
2307 assert!(
2308 origin_sst
2309 .key_range
2310 .right
2311 .cmp(&branched_sst.key_range.left)
2312 .is_le()
2313 );
2314 assert!(origin_sst.table_ids.is_sorted());
2315 assert!(branched_sst.table_ids.is_sorted());
2316 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2317 assert!(branched_sst.sst_size < origin_sst.file_size);
2318 assert_eq!(10, branched_sst.sst_id);
2319 assert_eq!(11, origin_sst.sst_id);
2320 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2322
2323 {
2324 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2326 let origin_sst = sst.clone();
2327 let sst_size = origin_sst.sst_size;
2328 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2329 assert_eq!(SstSplitType::Both, split_type);
2330
2331 let mut new_sst_id = 10.into();
2332 let (origin_sst, branched_sst) = group_split::split_sst(
2333 origin_sst,
2334 &mut new_sst_id,
2335 split_key,
2336 sst_size / 2,
2337 sst_size / 2,
2338 );
2339
2340 let origin_sst = origin_sst.unwrap();
2341 let branched_sst = branched_sst.unwrap();
2342
2343 assert!(origin_sst.key_range.right_exclusive);
2344 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2345 assert!(origin_sst.table_ids.is_sorted());
2346 assert!(branched_sst.table_ids.is_sorted());
2347 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2348 assert!(branched_sst.sst_size < origin_sst.file_size);
2349 assert_eq!(10, branched_sst.sst_id);
2350 assert_eq!(11, origin_sst.sst_id);
2351 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2353
2354 {
2355 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2356 let split_type = group_split::need_to_split(&sst, split_key);
2357 assert_eq!(SstSplitType::Left, split_type);
2358 }
2359
2360 {
2361 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2362 let origin_sst = sst.clone();
2363 let split_type = group_split::need_to_split(&origin_sst, split_key);
2364 assert_eq!(SstSplitType::Both, split_type);
2365
2366 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2367 let origin_sst = sst;
2368 let split_type = group_split::need_to_split(&origin_sst, split_key);
2369 assert_eq!(SstSplitType::Right, split_type);
2370 }
2371
2372 {
2373 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2375 sst.key_range.right = sst.key_range.left.clone();
2376 let sst: SstableInfo = sst.into();
2377 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2378 let origin_sst = sst;
2379 let sst_size = origin_sst.sst_size;
2380
2381 let mut new_sst_id = 10.into();
2382 let (origin_sst, branched_sst) = group_split::split_sst(
2383 origin_sst,
2384 &mut new_sst_id,
2385 split_key,
2386 sst_size / 2,
2387 sst_size / 2,
2388 );
2389
2390 assert!(origin_sst.is_none());
2391 assert!(branched_sst.is_some());
2392 }
2393 }
2394
2395 #[test]
2396 fn test_split_sst_info_for_level() {
2397 let mut version = HummockVersion {
2398 id: HummockVersionId(0),
2399 levels: HashMap::from_iter([(
2400 1,
2401 build_initial_compaction_group_levels(
2402 1,
2403 &CompactionConfig {
2404 max_level: 6,
2405 ..Default::default()
2406 },
2407 ),
2408 )]),
2409 ..Default::default()
2410 };
2411
2412 let cg1 = version.levels.get_mut(&1).unwrap();
2413
2414 cg1.levels[0] = Level {
2415 level_idx: 1,
2416 level_type: LevelType::Nonoverlapping,
2417 table_infos: vec![
2418 gen_sst_info(
2419 1,
2420 vec![3],
2421 FullKey::for_test(
2422 TableId::new(3),
2423 gen_key_from_str(VirtualNode::from_index(1), "1"),
2424 0,
2425 )
2426 .encode()
2427 .into(),
2428 FullKey::for_test(
2429 TableId::new(3),
2430 gen_key_from_str(VirtualNode::from_index(200), "1"),
2431 0,
2432 )
2433 .encode()
2434 .into(),
2435 ),
2436 gen_sst_info(
2437 10,
2438 vec![3, 4],
2439 FullKey::for_test(
2440 TableId::new(3),
2441 gen_key_from_str(VirtualNode::from_index(201), "1"),
2442 0,
2443 )
2444 .encode()
2445 .into(),
2446 FullKey::for_test(
2447 TableId::new(4),
2448 gen_key_from_str(VirtualNode::from_index(10), "1"),
2449 0,
2450 )
2451 .encode()
2452 .into(),
2453 ),
2454 gen_sst_info(
2455 11,
2456 vec![4],
2457 FullKey::for_test(
2458 TableId::new(4),
2459 gen_key_from_str(VirtualNode::from_index(11), "1"),
2460 0,
2461 )
2462 .encode()
2463 .into(),
2464 FullKey::for_test(
2465 TableId::new(4),
2466 gen_key_from_str(VirtualNode::from_index(200), "1"),
2467 0,
2468 )
2469 .encode()
2470 .into(),
2471 ),
2472 ],
2473 total_file_size: 300,
2474 ..Default::default()
2475 };
2476
2477 cg1.l0.sub_levels.push(Level {
2478 level_idx: 0,
2479 table_infos: vec![
2480 gen_sst_info(
2481 2,
2482 vec![2],
2483 FullKey::for_test(
2484 TableId::new(0),
2485 gen_key_from_str(VirtualNode::from_index(1), "1"),
2486 0,
2487 )
2488 .encode()
2489 .into(),
2490 FullKey::for_test(
2491 TableId::new(2),
2492 gen_key_from_str(VirtualNode::from_index(200), "1"),
2493 0,
2494 )
2495 .encode()
2496 .into(),
2497 ),
2498 gen_sst_info(
2499 22,
2500 vec![2],
2501 FullKey::for_test(
2502 TableId::new(0),
2503 gen_key_from_str(VirtualNode::from_index(1), "1"),
2504 0,
2505 )
2506 .encode()
2507 .into(),
2508 FullKey::for_test(
2509 TableId::new(2),
2510 gen_key_from_str(VirtualNode::from_index(200), "1"),
2511 0,
2512 )
2513 .encode()
2514 .into(),
2515 ),
2516 gen_sst_info(
2517 23,
2518 vec![2],
2519 FullKey::for_test(
2520 TableId::new(0),
2521 gen_key_from_str(VirtualNode::from_index(1), "1"),
2522 0,
2523 )
2524 .encode()
2525 .into(),
2526 FullKey::for_test(
2527 TableId::new(2),
2528 gen_key_from_str(VirtualNode::from_index(200), "1"),
2529 0,
2530 )
2531 .encode()
2532 .into(),
2533 ),
2534 gen_sst_info(
2535 24,
2536 vec![2],
2537 FullKey::for_test(
2538 TableId::new(2),
2539 gen_key_from_str(VirtualNode::from_index(1), "1"),
2540 0,
2541 )
2542 .encode()
2543 .into(),
2544 FullKey::for_test(
2545 TableId::new(2),
2546 gen_key_from_str(VirtualNode::from_index(200), "1"),
2547 0,
2548 )
2549 .encode()
2550 .into(),
2551 ),
2552 gen_sst_info(
2553 25,
2554 vec![2],
2555 FullKey::for_test(
2556 TableId::new(0),
2557 gen_key_from_str(VirtualNode::from_index(1), "1"),
2558 0,
2559 )
2560 .encode()
2561 .into(),
2562 FullKey::for_test(
2563 TableId::new(0),
2564 gen_key_from_str(VirtualNode::from_index(200), "1"),
2565 0,
2566 )
2567 .encode()
2568 .into(),
2569 ),
2570 ],
2571 sub_level_id: 101,
2572 level_type: LevelType::Overlapping,
2573 total_file_size: 300,
2574 ..Default::default()
2575 });
2576
2577 {
2578 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2580
2581 let mut new_sst_id = 100.into();
2582 let x = group_split::split_sst_info_for_level_v2(
2583 &mut cg1.l0.sub_levels[0],
2584 &mut new_sst_id,
2585 split_key,
2586 );
2587 let mut right_l0 = OverlappingLevel {
2596 sub_levels: vec![],
2597 total_file_size: 0,
2598 uncompressed_file_size: 0,
2599 };
2600
2601 right_l0.sub_levels.push(Level {
2602 level_idx: 0,
2603 table_infos: x,
2604 sub_level_id: 101,
2605 total_file_size: 100,
2606 level_type: LevelType::Overlapping,
2607 ..Default::default()
2608 });
2609
2610 let right_levels = Levels {
2611 levels: vec![],
2612 l0: right_l0,
2613 ..Default::default()
2614 };
2615
2616 merge_levels(cg1, right_levels);
2617 }
2618
2619 {
2620 let mut new_sst_id = 100.into();
2622 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2623 let x = group_split::split_sst_info_for_level_v2(
2624 &mut cg1.levels[2],
2625 &mut new_sst_id,
2626 split_key,
2627 );
2628
2629 assert!(x.is_empty());
2630 }
2631
2632 {
2633 let mut cg1 = cg1.clone();
2635 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2636
2637 let mut new_sst_id = 100.into();
2638 let x = group_split::split_sst_info_for_level_v2(
2639 &mut cg1.levels[0],
2640 &mut new_sst_id,
2641 split_key,
2642 );
2643
2644 assert_eq!(3, x.len());
2645 assert_eq!(1, x[0].sst_id);
2646 assert_eq!(100, x[0].sst_size);
2647 assert_eq!(10, x[1].sst_id);
2648 assert_eq!(100, x[1].sst_size);
2649 assert_eq!(11, x[2].sst_id);
2650 assert_eq!(100, x[2].sst_size);
2651
2652 assert_eq!(0, cg1.levels[0].table_infos.len());
2653 }
2654
2655 {
2656 let mut cg1 = cg1.clone();
2658 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2659
2660 let mut new_sst_id = 100.into();
2661 let x = group_split::split_sst_info_for_level_v2(
2662 &mut cg1.levels[0],
2663 &mut new_sst_id,
2664 split_key,
2665 );
2666
2667 assert_eq!(0, x.len());
2668 assert_eq!(3, cg1.levels[0].table_infos.len());
2669 }
2670
2671 {
2697 let mut cg1 = cg1.clone();
2699 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2700
2701 let mut new_sst_id = 100.into();
2702 let x = group_split::split_sst_info_for_level_v2(
2703 &mut cg1.levels[0],
2704 &mut new_sst_id,
2705 split_key,
2706 );
2707
2708 assert_eq!(2, x.len());
2709 assert_eq!(100, x[0].sst_id);
2710 assert_eq!(100 / 2, x[0].sst_size);
2711 assert_eq!(11, x[1].sst_id);
2712 assert_eq!(100, x[1].sst_size);
2713 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2714
2715 assert_eq!(2, cg1.levels[0].table_infos.len());
2716 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2717 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2718 assert_eq!(
2719 vec![TableId::new(3)],
2720 cg1.levels[0].table_infos[1].table_ids
2721 );
2722 }
2723 }
2724}