Skip to main content

risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52    pub insert_sst_level: u32,
53    pub insert_sst_infos: Vec<SstableInfo>,
54    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61        self.levels
62            .get(&compaction_group_id)
63            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64    }
65
66    pub fn get_compaction_group_levels_mut(
67        &mut self,
68        compaction_group_id: CompactionGroupId,
69    ) -> &mut Levels {
70        self.levels
71            .get_mut(&compaction_group_id)
72            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73    }
74
75    // only scan the sst infos from levels in the specified compaction group (without table change log)
76    pub fn get_sst_ids_by_group_id(
77        &self,
78        compaction_group_id: CompactionGroupId,
79    ) -> impl Iterator<Item = HummockSstableId> + '_ {
80        self.levels
81            .iter()
82            .filter_map(move |(cg_id, level)| {
83                if *cg_id == compaction_group_id {
84                    Some(level)
85                } else {
86                    None
87                }
88            })
89            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90            .flat_map(|level| level.table_infos.iter())
91            .map(|s| s.sst_id)
92    }
93
94    /// Prune stale table ids that no longer exist in `state_table_info` from SST metadata.
95    ///
96    /// This is used to normalize recovered versions from old clusters where dropped table ids
97    /// may still exist in persisted SST metadata.
98    pub fn prune_stale_table_ids_from_ssts(&mut self) -> usize {
99        let live_table_ids: HashSet<_> = self.state_table_info.info().keys().copied().collect();
100        // Older checkpoints may rely on deprecated `member_table_ids` before `state_table_info`
101        // is backfilled.
102        if live_table_ids.is_empty()
103            && self.levels.values().any(|levels| {
104                #[expect(deprecated)]
105                {
106                    !levels.member_table_ids.is_empty()
107                }
108            })
109        {
110            return 0;
111        }
112
113        let mut pruned_table_ids = HashSet::new();
114
115        for levels in self.levels.values_mut() {
116            let stale_table_ids = levels
117                .l0
118                .sub_levels
119                .iter()
120                .chain(levels.levels.iter())
121                .flat_map(|level| level.table_infos.iter())
122                .flat_map(|sst| sst.table_ids.iter().copied())
123                .filter(|table_id| !live_table_ids.contains(table_id))
124                .collect::<HashSet<_>>();
125
126            if stale_table_ids.is_empty() {
127                continue;
128            }
129
130            pruned_table_ids.extend(stale_table_ids.iter().copied());
131            levels.prune_table_ids_from_ssts(&stale_table_ids);
132        }
133
134        pruned_table_ids.len()
135    }
136
137    pub fn level_iter<F: FnMut(&Level) -> bool>(
138        &self,
139        compaction_group_id: CompactionGroupId,
140        mut f: F,
141    ) {
142        if let Some(levels) = self.levels.get(&compaction_group_id) {
143            for sub_level in &levels.l0.sub_levels {
144                if !f(sub_level) {
145                    return;
146                }
147            }
148            for level in &levels.levels {
149                if !f(level) {
150                    return;
151                }
152            }
153        }
154    }
155
156    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
157        // l0 is currently separated from all levels
158        self.levels
159            .get(&compaction_group_id)
160            .map(|group| group.levels.len() + 1)
161            .unwrap_or(0)
162    }
163
164    pub fn safe_epoch_table_watermarks(
165        &self,
166        existing_table_ids: &[TableId],
167    ) -> BTreeMap<TableId, TableWatermarks> {
168        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
169    }
170}
171
172pub fn safe_epoch_table_watermarks_impl(
173    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
174    existing_table_ids: &[TableId],
175) -> BTreeMap<TableId, TableWatermarks> {
176    fn extract_single_table_watermark(
177        table_watermarks: &TableWatermarks,
178    ) -> Option<TableWatermarks> {
179        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
180            Some(TableWatermarks {
181                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
182                direction: table_watermarks.direction,
183                watermark_type: table_watermarks.watermark_type,
184            })
185        } else {
186            None
187        }
188    }
189    table_watermarks
190        .iter()
191        .filter_map(|(table_id, table_watermarks)| {
192            if !existing_table_ids.contains(table_id) {
193                None
194            } else {
195                extract_single_table_watermark(table_watermarks)
196                    .map(|table_watermarks| (*table_id, table_watermarks))
197            }
198        })
199        .collect()
200}
201
202pub fn safe_epoch_read_table_watermarks_impl(
203    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
204) -> BTreeMap<TableId, ReadTableWatermark> {
205    safe_epoch_watermarks
206        .into_iter()
207        .map(|(table_id, watermarks)| {
208            assert_eq!(watermarks.watermarks.len(), 1);
209            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
210            let mut vnode_watermark_map = BTreeMap::new();
211            for vnode_watermark in vnode_watermarks.iter() {
212                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
213                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
214                    assert!(
215                        vnode_watermark_map
216                            .insert(vnode, watermark.clone())
217                            .is_none(),
218                        "duplicate table watermark on vnode {}",
219                        vnode.to_index()
220                    );
221                }
222            }
223            (
224                table_id,
225                ReadTableWatermark {
226                    direction: watermarks.direction,
227                    vnode_watermarks: vnode_watermark_map,
228                },
229            )
230        })
231        .collect()
232}
233
234impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
235    pub fn count_new_ssts_in_group_split(
236        &self,
237        parent_group_id: CompactionGroupId,
238        split_key: Bytes,
239    ) -> u64 {
240        self.levels
241            .get(&parent_group_id)
242            .map_or(0, |parent_levels| {
243                let l0 = &parent_levels.l0;
244                let mut split_count = 0;
245                for sub_level in &l0.sub_levels {
246                    assert!(!sub_level.table_infos.is_empty());
247
248                    if sub_level.level_type == PbLevelType::Overlapping {
249                        // TODO: use table_id / vnode / key_range filter
250                        split_count += sub_level
251                            .table_infos
252                            .iter()
253                            .map(|sst| {
254                                if let group_split::SstSplitType::Both =
255                                    group_split::need_to_split(sst, split_key.clone())
256                                {
257                                    2
258                                } else {
259                                    0
260                                }
261                            })
262                            .sum::<u64>();
263                        continue;
264                    }
265
266                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
267                    let sst = sub_level.table_infos.get(pos).unwrap();
268
269                    if let group_split::SstSplitType::Both =
270                        group_split::need_to_split(sst, split_key.clone())
271                    {
272                        split_count += 2;
273                    }
274                }
275
276                for level in &parent_levels.levels {
277                    if level.table_infos.is_empty() {
278                        continue;
279                    }
280                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
281                    let sst = level.table_infos.get(pos).unwrap();
282                    if let group_split::SstSplitType::Both =
283                        group_split::need_to_split(sst, split_key.clone())
284                    {
285                        split_count += 2;
286                    }
287                }
288
289                split_count
290            })
291    }
292
293    pub fn init_with_parent_group(
294        &mut self,
295        parent_group_id: CompactionGroupId,
296        group_id: CompactionGroupId,
297        member_table_ids: BTreeSet<StateTableId>,
298        new_sst_start_id: HummockSstableId,
299    ) {
300        let mut new_sst_id = new_sst_start_id;
301        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
302            if new_sst_start_id != 0 {
303                if cfg!(debug_assertions) {
304                    panic!(
305                        "non-zero sst start id {} for NewCompactionGroup",
306                        new_sst_start_id
307                    );
308                } else {
309                    warn!(
310                        %new_sst_start_id,
311                        "non-zero sst start id for NewCompactionGroup"
312                    );
313                }
314            }
315            return;
316        } else if !self.levels.contains_key(&parent_group_id) {
317            unreachable!(
318                "non-existing parent group id {} to init from",
319                parent_group_id
320            );
321        }
322        let [parent_levels, cur_levels] = self
323            .levels
324            .get_disjoint_mut([&parent_group_id, &group_id])
325            .map(|res| res.unwrap());
326        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
327        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
328        parent_levels.compaction_group_version_id += 1;
329        cur_levels.compaction_group_version_id += 1;
330        let l0 = &mut parent_levels.l0;
331        {
332            for sub_level in &mut l0.sub_levels {
333                let target_l0 = &mut cur_levels.l0;
334                // Remove SST from sub level may result in empty sub level. It will be purged
335                // whenever another compaction task is finished.
336                let insert_table_infos =
337                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
338                sub_level.normalize();
339                if insert_table_infos.is_empty() {
340                    continue;
341                }
342                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
343                    Ok(idx) => {
344                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
345                    }
346                    Err(idx) => {
347                        insert_new_sub_level(
348                            target_l0,
349                            sub_level.sub_level_id,
350                            sub_level.level_type,
351                            insert_table_infos,
352                            Some(idx),
353                        );
354                    }
355                }
356            }
357            l0.normalize();
358        }
359        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
360            let insert_table_infos =
361                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
362            cur_levels.levels[idx].total_file_size += insert_table_infos
363                .iter()
364                .map(|sst| sst.sst_size)
365                .sum::<u64>();
366            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
367                .iter()
368                .map(|sst| sst.uncompressed_file_size)
369                .sum::<u64>();
370            cur_levels.levels[idx]
371                .table_infos
372                .extend(insert_table_infos);
373            cur_levels.levels[idx]
374                .table_infos
375                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
376            assert!(can_concat(&cur_levels.levels[idx].table_infos));
377            level.normalize();
378        }
379
380        assert!(
381            parent_levels
382                .l0
383                .sub_levels
384                .iter()
385                .all(|level| !level.table_infos.is_empty())
386        );
387        assert!(
388            cur_levels
389                .l0
390                .sub_levels
391                .iter()
392                .all(|level| !level.table_infos.is_empty())
393        );
394    }
395
396    pub fn build_sst_delta_infos(
397        &self,
398        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
399    ) -> Vec<SstDeltaInfo> {
400        let mut infos = vec![];
401
402        // Skip trivial move delta for refiller
403        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
404        if version_delta.trivial_move {
405            return infos;
406        }
407
408        for (group_id, group_deltas) in &version_delta.group_deltas {
409            let mut info = SstDeltaInfo::default();
410
411            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
412            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
413
414            // Build only if all deltas are intra level deltas.
415            if !group_deltas.group_deltas.iter().all(|delta| {
416                matches!(
417                    delta,
418                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
419                )
420            }) {
421                continue;
422            }
423
424            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
425            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
426            // struct to reduce conventions.
427            for group_delta in &group_deltas.group_deltas {
428                match group_delta {
429                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
430                        if !inserted_table_infos.is_empty() {
431                            info.insert_sst_level = 0;
432                            info.insert_sst_infos
433                                .extend(inserted_table_infos.iter().cloned());
434                        }
435                    }
436                    GroupDeltaCommon::IntraLevel(intra_level) => {
437                        if !intra_level.inserted_table_infos.is_empty() {
438                            info.insert_sst_level = intra_level.level_idx;
439                            info.insert_sst_infos
440                                .extend(intra_level.inserted_table_infos.iter().cloned());
441                        }
442                        if !intra_level.removed_table_ids.is_empty() {
443                            for id in &intra_level.removed_table_ids {
444                                if intra_level.level_idx == 0 {
445                                    removed_l0_ssts.insert(*id);
446                                } else {
447                                    removed_ssts
448                                        .entry(intra_level.level_idx)
449                                        .or_default()
450                                        .insert(*id);
451                                }
452                            }
453                        }
454                    }
455                    GroupDeltaCommon::GroupConstruct(_)
456                    | GroupDeltaCommon::GroupDestroy(_)
457                    | GroupDeltaCommon::GroupMerge(_)
458                    | GroupDeltaCommon::PruneTableIdsFromSsts(_) => {}
459                }
460            }
461
462            let group = self.levels.get(group_id).unwrap();
463            for l0_sub_level in &group.level0().sub_levels {
464                for sst_info in &l0_sub_level.table_infos {
465                    if removed_l0_ssts.remove(&sst_info.sst_id) {
466                        info.delete_sst_object_ids.push(sst_info.object_id);
467                    }
468                }
469            }
470            for level in &group.levels {
471                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
472                    for sst_info in &level.table_infos {
473                        if removed_level_ssts.remove(&sst_info.sst_id) {
474                            info.delete_sst_object_ids.push(sst_info.object_id);
475                        }
476                    }
477                    if !removed_level_ssts.is_empty() {
478                        tracing::error!(
479                            "removed_level_ssts is not empty: {:?}",
480                            removed_level_ssts,
481                        );
482                    }
483                    debug_assert!(removed_level_ssts.is_empty());
484                }
485            }
486
487            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
488                tracing::error!(
489                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
490                    removed_l0_ssts,
491                    removed_ssts
492                );
493            }
494            debug_assert!(removed_l0_ssts.is_empty());
495            debug_assert!(removed_ssts.is_empty());
496
497            infos.push(info);
498        }
499
500        infos
501    }
502
503    /// Used by migration of table change log to meta store.
504    /// `Self::table_change_log` will be consumed by the migration process later.
505    pub fn apply_table_change_log_delta_backward_compatibility(
506        &mut self,
507        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
508    ) {
509        #[expect(deprecated)]
510        for (table_id, change_log_delta) in &version_delta.change_log_delta {
511            let new_change_log = &change_log_delta.new_log;
512            match self.table_change_log.entry(*table_id) {
513                Entry::Occupied(entry) => {
514                    let change_log = entry.into_mut();
515                    change_log.add_change_log(new_change_log.clone());
516                }
517                Entry::Vacant(entry) => {
518                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
519                }
520            };
521        }
522    }
523
524    pub fn apply_version_delta(
525        &mut self,
526        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
527    ) -> HashMap<TableId, Option<StateTableInfo>> {
528        assert_eq!(self.id, version_delta.prev_id);
529
530        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
531            &version_delta.state_table_info_delta,
532            &version_delta.removed_table_ids,
533        );
534
535        #[expect(deprecated)]
536        {
537            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
538                is_commit_epoch = true;
539                tracing::trace!(
540                    "max committed epoch bumped but no table committed epoch is changed"
541                );
542            }
543        }
544
545        // apply to `levels`, which is different compaction groups
546        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
547            let mut is_applied_l0_compact = false;
548            for group_delta in &group_deltas.group_deltas {
549                match group_delta {
550                    GroupDeltaCommon::GroupConstruct(group_construct) => {
551                        let mut new_levels = build_initial_compaction_group_levels(
552                            *compaction_group_id,
553                            group_construct.get_group_config().unwrap(),
554                        );
555                        let parent_group_id = group_construct.parent_group_id;
556                        new_levels.parent_group_id = parent_group_id;
557                        #[expect(deprecated)]
558                        // for backward-compatibility of previous hummock version delta
559                        new_levels
560                            .member_table_ids
561                            .clone_from(&group_construct.table_ids);
562                        self.levels.insert(*compaction_group_id, new_levels);
563                        let member_table_ids = if group_construct.version()
564                            >= CompatibilityVersion::NoMemberTableIds
565                        {
566                            self.state_table_info
567                                .compaction_group_member_table_ids(*compaction_group_id)
568                                .iter()
569                                .copied()
570                                .collect()
571                        } else {
572                            #[expect(deprecated)]
573                            // for backward-compatibility of previous hummock version delta
574                            BTreeSet::from_iter(
575                                group_construct.table_ids.iter().copied().map(Into::into),
576                            )
577                        };
578
579                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
580                            let split_key = if group_construct.split_key.is_some() {
581                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
582                            } else {
583                                None
584                            };
585                            self.init_with_parent_group_v2(
586                                parent_group_id,
587                                *compaction_group_id,
588                                group_construct.new_sst_start_id,
589                                split_key.clone(),
590                            );
591                        } else {
592                            // for backward-compatibility of previous hummock version delta
593                            self.init_with_parent_group(
594                                parent_group_id,
595                                *compaction_group_id,
596                                member_table_ids,
597                                group_construct.new_sst_start_id,
598                            );
599                        }
600                    }
601                    GroupDeltaCommon::GroupMerge(group_merge) => {
602                        tracing::info!(
603                            "group_merge left {:?} right {:?}",
604                            group_merge.left_group_id,
605                            group_merge.right_group_id
606                        );
607                        self.merge_compaction_group(
608                            group_merge.left_group_id,
609                            group_merge.right_group_id,
610                        )
611                    }
612                    GroupDeltaCommon::IntraLevel(level_delta) => {
613                        let levels =
614                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
615                                panic!("compaction group {} does not exist", compaction_group_id)
616                            });
617                        if is_commit_epoch {
618                            assert!(
619                                level_delta.removed_table_ids.is_empty(),
620                                "no sst should be deleted when committing an epoch"
621                            );
622
623                            let IntraLevelDelta {
624                                level_idx,
625                                l0_sub_level_id,
626                                inserted_table_infos,
627                                ..
628                            } = level_delta;
629                            {
630                                assert_eq!(
631                                    *level_idx, 0,
632                                    "we should only add to L0 when we commit an epoch."
633                                );
634                                if !inserted_table_infos.is_empty() {
635                                    insert_new_sub_level(
636                                        &mut levels.l0,
637                                        *l0_sub_level_id,
638                                        PbLevelType::Overlapping,
639                                        inserted_table_infos.clone(),
640                                        None,
641                                    );
642                                }
643                            }
644                        } else {
645                            // The delta is caused by compaction.
646                            levels.apply_compact_ssts(
647                                level_delta,
648                                self.state_table_info
649                                    .compaction_group_member_table_ids(*compaction_group_id),
650                            );
651                            if level_delta.level_idx == 0 {
652                                is_applied_l0_compact = true;
653                            }
654                        }
655                    }
656                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
657                        let levels =
658                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
659                                panic!("compaction group {} does not exist", compaction_group_id)
660                            });
661                        assert!(is_commit_epoch);
662
663                        if !inserted_table_infos.is_empty() {
664                            let next_l0_sub_level_id = levels
665                                .l0
666                                .sub_levels
667                                .last()
668                                .map(|level| level.sub_level_id + 1)
669                                .unwrap_or(1);
670
671                            insert_new_sub_level(
672                                &mut levels.l0,
673                                next_l0_sub_level_id,
674                                PbLevelType::Overlapping,
675                                inserted_table_infos.clone(),
676                                None,
677                            );
678                        }
679                    }
680                    GroupDeltaCommon::GroupDestroy(_) => {
681                        self.levels.remove(compaction_group_id);
682                    }
683
684                    GroupDeltaCommon::PruneTableIdsFromSsts(table_ids) => {
685                        self.levels
686                            .get_mut(compaction_group_id)
687                            .unwrap_or_else(|| {
688                                panic!("compaction group {} does not exist", compaction_group_id)
689                            })
690                            .prune_table_ids_from_ssts(table_ids);
691                    }
692                }
693            }
694
695            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
696            {
697                levels.l0.normalize();
698            }
699        }
700        self.id = version_delta.id;
701        #[expect(deprecated)]
702        {
703            self.max_committed_epoch = version_delta.max_committed_epoch;
704        }
705
706        // apply to table watermark
707
708        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
709        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
710            HashMap::new();
711
712        // apply to table watermark
713        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
714            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
715                if version_delta.removed_table_ids.contains(table_id) {
716                    modified_table_watermarks.insert(*table_id, None);
717                } else {
718                    let mut current_table_watermarks = (**current_table_watermarks).clone();
719                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
720                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
721                }
722            } else {
723                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
724            }
725        }
726        for (table_id, table_watermarks) in &self.table_watermarks {
727            let safe_epoch = if let Some(state_table_info) =
728                self.state_table_info.info().get(table_id)
729                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
730                && state_table_info.committed_epoch > *oldest_epoch
731            {
732                // safe epoch has progressed, need further clear.
733                state_table_info.committed_epoch
734            } else {
735                // safe epoch not progressed or the table has been removed. No need to truncate
736                continue;
737            };
738            let table_watermarks = modified_table_watermarks
739                .entry(*table_id)
740                .or_insert_with(|| Some((**table_watermarks).clone()));
741            if let Some(table_watermarks) = table_watermarks {
742                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
743            }
744        }
745        // apply the staging table watermark to hummock version
746        for (table_id, table_watermarks) in modified_table_watermarks {
747            if let Some(table_watermarks) = table_watermarks {
748                self.table_watermarks
749                    .insert(table_id, Arc::new(table_watermarks));
750            } else {
751                self.table_watermarks.remove(&table_id);
752            }
753        }
754        // apply to vector index
755        apply_vector_index_delta(
756            &mut self.vector_indexes,
757            &version_delta.vector_index_delta,
758            &version_delta.removed_table_ids,
759        );
760
761        changed_table_info
762    }
763
764    pub fn apply_change_log_delta<T: Clone>(
765        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
766        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
767    ) {
768        for (table_id, change_log_delta) in change_log_delta {
769            let new_change_log = &change_log_delta.new_log;
770            match table_change_log.entry(*table_id) {
771                Entry::Occupied(entry) => {
772                    let change_log = entry.into_mut();
773                    change_log.add_change_log(new_change_log.clone());
774                }
775                Entry::Vacant(entry) => {
776                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
777                }
778            };
779        }
780
781        // truncate the remaining table change log
782        for (table_id, change_log_delta) in change_log_delta {
783            if let Some(change_log) = table_change_log.get_mut(table_id) {
784                change_log.truncate(change_log_delta.truncate_epoch);
785            }
786        }
787    }
788
789    /// Returns the log deltas required to truncate the entire change log for a table.
790    pub fn collect_gc_change_log_delta<'a, T: Clone>(
791        current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
792        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
793        removed_table_ids: &HashSet<TableId>,
794        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
795        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
796    ) -> HashSet<TableId> {
797        let mut gc_change_log_delta = HashSet::new();
798        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
799        // the change log for the table, and then we will remove the table change log.
800        // The table change log will also be removed when the table id is removed.
801        for table_id in current_change_log_table_ids {
802            if removed_table_ids.contains(table_id) {
803                gc_change_log_delta.insert(*table_id);
804                continue;
805            }
806            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
807                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
808                && table_info_delta.committed_epoch > prev_table_info.committed_epoch
809            {
810                // the table exists previously, and its committed epoch has progressed.
811            } else {
812                // otherwise, the table change log should be kept anyway
813                continue;
814            }
815            let contains = change_log_delta.contains_key(table_id);
816            if !contains {
817                gc_change_log_delta.insert(*table_id);
818                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
819                    LazyLock::new(|| LogSuppressor::per_second(1));
820                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
821                    warn!(
822                        suppressed_count,
823                        %table_id,
824                        "table change log dropped due to no further change log at newly committed epoch"
825                    );
826                }
827            }
828        }
829        gc_change_log_delta
830    }
831
832    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
833        let mut ret: BTreeMap<_, _> = BTreeMap::new();
834        for (compaction_group_id, group) in &self.levels {
835            let mut levels = vec![];
836            levels.extend(group.l0.sub_levels.iter());
837            levels.extend(group.levels.iter());
838            for level in levels {
839                for table_info in &level.table_infos {
840                    if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
841                        continue;
842                    }
843                    let object_id = table_info.object_id;
844                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
845                    entry
846                        .entry(*compaction_group_id)
847                        .or_default()
848                        .push(table_info.sst_id)
849                }
850            }
851        }
852        ret
853    }
854
855    pub fn merge_compaction_group(
856        &mut self,
857        left_group_id: CompactionGroupId,
858        right_group_id: CompactionGroupId,
859    ) {
860        // Double check
861        let left_group_id_table_ids = self
862            .state_table_info
863            .compaction_group_member_table_ids(left_group_id)
864            .iter();
865        let right_group_id_table_ids = self
866            .state_table_info
867            .compaction_group_member_table_ids(right_group_id)
868            .iter();
869
870        assert!(
871            left_group_id_table_ids
872                .chain(right_group_id_table_ids)
873                .is_sorted()
874        );
875
876        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
877        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
878            panic!(
879                "compaction group should exist right {} all {:?}",
880                right_group_id, total_cg
881            )
882        });
883
884        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
885            panic!(
886                "compaction group should exist left {} all {:?}",
887                left_group_id, total_cg
888            )
889        });
890
891        group_split::merge_levels(left_levels, right_levels);
892    }
893
894    pub fn init_with_parent_group_v2(
895        &mut self,
896        parent_group_id: CompactionGroupId,
897        group_id: CompactionGroupId,
898        new_sst_start_id: HummockSstableId,
899        split_key: Option<Bytes>,
900    ) {
901        let mut new_sst_id = new_sst_start_id;
902        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
903            if new_sst_start_id != 0 {
904                if cfg!(debug_assertions) {
905                    panic!(
906                        "non-zero sst start id {} for NewCompactionGroup",
907                        new_sst_start_id
908                    );
909                } else {
910                    warn!(
911                        %new_sst_start_id,
912                        "non-zero sst start id for NewCompactionGroup"
913                    );
914                }
915            }
916            return;
917        } else if !self.levels.contains_key(&parent_group_id) {
918            unreachable!(
919                "non-existing parent group id {} to init from (V2)",
920                parent_group_id
921            );
922        }
923
924        let [parent_levels, cur_levels] = self
925            .levels
926            .get_disjoint_mut([&parent_group_id, &group_id])
927            .map(|res| res.unwrap());
928        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
929        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
930        parent_levels.compaction_group_version_id += 1;
931        cur_levels.compaction_group_version_id += 1;
932
933        let l0 = &mut parent_levels.l0;
934        {
935            for sub_level in &mut l0.sub_levels {
936                let target_l0 = &mut cur_levels.l0;
937                // Remove SST from sub level may result in empty sub level. It will be purged
938                // whenever another compaction task is finished.
939                let insert_table_infos = if let Some(split_key) = &split_key {
940                    group_split::split_sst_info_for_level_v2(
941                        sub_level,
942                        &mut new_sst_id,
943                        split_key.clone(),
944                    )
945                } else {
946                    vec![]
947                };
948
949                if insert_table_infos.is_empty() {
950                    continue;
951                }
952
953                sub_level.normalize();
954                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
955                    Ok(idx) => {
956                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
957                    }
958                    Err(idx) => {
959                        insert_new_sub_level(
960                            target_l0,
961                            sub_level.sub_level_id,
962                            sub_level.level_type,
963                            insert_table_infos,
964                            Some(idx),
965                        );
966                    }
967                }
968            }
969            l0.normalize();
970        }
971
972        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
973            let insert_table_infos = if let Some(split_key) = &split_key {
974                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
975            } else {
976                vec![]
977            };
978
979            if insert_table_infos.is_empty() {
980                continue;
981            }
982
983            cur_levels.levels[idx].total_file_size += insert_table_infos
984                .iter()
985                .map(|sst| sst.sst_size)
986                .sum::<u64>();
987            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
988                .iter()
989                .map(|sst| sst.uncompressed_file_size)
990                .sum::<u64>();
991            cur_levels.levels[idx]
992                .table_infos
993                .extend(insert_table_infos);
994            cur_levels.levels[idx]
995                .table_infos
996                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
997            assert!(can_concat(&cur_levels.levels[idx].table_infos));
998            level.normalize();
999        }
1000
1001        assert!(
1002            parent_levels
1003                .l0
1004                .sub_levels
1005                .iter()
1006                .all(|level| !level.table_infos.is_empty())
1007        );
1008        assert!(
1009            cur_levels
1010                .l0
1011                .sub_levels
1012                .iter()
1013                .all(|level| !level.table_infos.is_empty())
1014        );
1015    }
1016}
1017
1018impl<T> HummockVersionCommon<T>
1019where
1020    T: SstableIdReader + ObjectIdReader,
1021{
1022    pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
1023        // DO NOT REMOVE THIS LINE
1024        // This is to ensure that when adding new variant to `HummockObjectId`,
1025        // the compiler will warn us if we forget to handle it here.
1026        match HummockObjectId::Sstable(0.into()) {
1027            HummockObjectId::Sstable(_) => {}
1028            HummockObjectId::VectorFile(_) => {}
1029            HummockObjectId::HnswGraphFile(_) => {}
1030        };
1031        self.get_sst_infos()
1032            .map(|s| HummockObjectId::Sstable(s.object_id()))
1033            .chain(
1034                self.vector_indexes
1035                    .values()
1036                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1037            )
1038    }
1039
1040    pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
1041        self.get_sst_infos().map(|s| s.sst_id()).collect()
1042    }
1043
1044    pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
1045        self.get_combined_levels()
1046            .flat_map(|level| level.table_infos.iter())
1047    }
1048}
1049
1050impl Levels {
1051    pub(crate) fn apply_compact_ssts(
1052        &mut self,
1053        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1054        member_table_ids: &BTreeSet<TableId>,
1055    ) {
1056        let IntraLevelDeltaCommon {
1057            level_idx,
1058            l0_sub_level_id,
1059            inserted_table_infos: insert_table_infos,
1060            vnode_partition_count,
1061            removed_table_ids: delete_sst_ids_set,
1062            compaction_group_version_id,
1063        } = level_delta;
1064        let new_vnode_partition_count = *vnode_partition_count;
1065
1066        if is_compaction_task_expired(
1067            self.compaction_group_version_id,
1068            *compaction_group_version_id,
1069        ) {
1070            warn!(
1071                current_compaction_group_version_id = self.compaction_group_version_id,
1072                delta_compaction_group_version_id = compaction_group_version_id,
1073                level_idx,
1074                l0_sub_level_id,
1075                insert_table_infos = ?insert_table_infos
1076                    .iter()
1077                    .map(|sst| (sst.sst_id, sst.object_id))
1078                    .collect_vec(),
1079                ?delete_sst_ids_set,
1080                "This VersionDelta may be committed by an expired compact task. Please check it."
1081            );
1082            return;
1083        }
1084        if !delete_sst_ids_set.is_empty() {
1085            if *level_idx == 0 {
1086                for level in &mut self.l0.sub_levels {
1087                    level.delete_ssts(delete_sst_ids_set);
1088                }
1089            } else {
1090                let idx = *level_idx as usize - 1;
1091                self.levels[idx].delete_ssts(delete_sst_ids_set);
1092            }
1093        }
1094
1095        if !insert_table_infos.is_empty() {
1096            let insert_sst_level_id = *level_idx;
1097            let insert_sub_level_id = *l0_sub_level_id;
1098            if insert_sst_level_id == 0 {
1099                let l0 = &mut self.l0;
1100                let index = l0
1101                    .sub_levels
1102                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1103                assert!(
1104                    index < l0.sub_levels.len()
1105                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1106                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1107                    insert_sub_level_id,
1108                    delete_sst_ids_set,
1109                    l0.sub_levels
1110                        .iter()
1111                        .map(|level| level.sub_level_id)
1112                        .collect_vec()
1113                );
1114                if l0.sub_levels[index].table_infos.is_empty()
1115                    && member_table_ids.len() == 1
1116                    && insert_table_infos.iter().all(|sst| {
1117                        sst.table_ids.len() == 1
1118                            && sst.table_ids[0]
1119                                == *member_table_ids.iter().next().expect("non-empty")
1120                    })
1121                {
1122                    // Only change vnode_partition_count for group which has only one state-table.
1123                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1124                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1125                }
1126                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1127            } else {
1128                let idx = insert_sst_level_id as usize - 1;
1129                if self.levels[idx].table_infos.is_empty()
1130                    && insert_table_infos
1131                        .iter()
1132                        .all(|sst| sst.table_ids.len() == 1)
1133                {
1134                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1135                } else if self.levels[idx].vnode_partition_count != 0
1136                    && new_vnode_partition_count == 0
1137                    && member_table_ids.len() > 1
1138                {
1139                    self.levels[idx].vnode_partition_count = 0;
1140                }
1141                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1142            }
1143        }
1144    }
1145
1146    /// Prune specified table ids from all SST metadata, remove emptied SSTs and sub-levels,
1147    /// then bump `compaction_group_version_id`.
1148    pub(crate) fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1149        for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1150            level.prune_table_ids_from_ssts(table_ids);
1151        }
1152        self.l0.normalize();
1153        self.compaction_group_version_id += 1;
1154    }
1155}
1156
1157impl<T, L> HummockVersionCommon<T, L> {
1158    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1159        self.levels
1160            .values()
1161            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1162    }
1163}
1164
1165pub fn build_initial_compaction_group_levels(
1166    group_id: impl Into<CompactionGroupId>,
1167    compaction_config: &CompactionConfig,
1168) -> Levels {
1169    let mut levels = vec![];
1170    for l in 0..compaction_config.get_max_level() {
1171        levels.push(Level {
1172            level_idx: (l + 1) as u32,
1173            level_type: PbLevelType::Nonoverlapping,
1174            table_infos: vec![],
1175            total_file_size: 0,
1176            sub_level_id: 0,
1177            uncompressed_file_size: 0,
1178            vnode_partition_count: 0,
1179        });
1180    }
1181    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1182    Levels {
1183        levels,
1184        l0: OverlappingLevel {
1185            sub_levels: vec![],
1186            total_file_size: 0,
1187            uncompressed_file_size: 0,
1188        },
1189        group_id: group_id.into(),
1190        parent_group_id: 0.into(),
1191        member_table_ids: vec![],
1192        compaction_group_version_id: 0,
1193    }
1194}
1195
1196fn split_sst_info_for_level(
1197    member_table_ids: &BTreeSet<TableId>,
1198    level: &mut Level,
1199    new_sst_id: &mut HummockSstableId,
1200) -> Vec<SstableInfo> {
1201    // Remove SST from sub level may result in empty sub level. It will be purged
1202    // whenever another compaction task is finished.
1203    let mut insert_table_infos = vec![];
1204    for sst_info in &mut level.table_infos {
1205        let removed_table_ids = sst_info
1206            .table_ids
1207            .iter()
1208            .filter(|table_id| member_table_ids.contains(*table_id))
1209            .cloned()
1210            .collect_vec();
1211        let sst_size = sst_info.sst_size;
1212        if sst_size / 2 == 0 {
1213            tracing::warn!(
1214                id = %sst_info.sst_id,
1215                object_id = %sst_info.object_id,
1216                sst_size = sst_info.sst_size,
1217                file_size = sst_info.file_size,
1218                "Sstable sst_size is under expected",
1219            );
1220        };
1221        if !removed_table_ids.is_empty() {
1222            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1223                sst_info,
1224                new_sst_id,
1225                sst_size / 2,
1226                sst_size / 2,
1227                member_table_ids.iter().cloned().collect_vec(),
1228            );
1229            *sst_info = modified_sst;
1230            insert_table_infos.push(branch_sst);
1231        }
1232    }
1233    insert_table_infos
1234}
1235
1236/// Gets all compaction group ids.
1237pub fn get_compaction_group_ids(
1238    version: &HummockVersion,
1239) -> impl Iterator<Item = CompactionGroupId> + '_ {
1240    version.levels.keys().cloned()
1241}
1242
1243pub fn get_table_compaction_group_id_mapping(
1244    version: &HummockVersion,
1245) -> HashMap<StateTableId, CompactionGroupId> {
1246    version
1247        .state_table_info
1248        .info()
1249        .iter()
1250        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1251        .collect()
1252}
1253
1254/// Gets all SSTs in `group_id`
1255pub fn get_compaction_group_ssts(
1256    version: &HummockVersion,
1257    group_id: CompactionGroupId,
1258) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1259    let group_levels = version.get_compaction_group_levels(group_id);
1260    group_levels
1261        .l0
1262        .sub_levels
1263        .iter()
1264        .rev()
1265        .chain(group_levels.levels.iter())
1266        .flat_map(|level| {
1267            level
1268                .table_infos
1269                .iter()
1270                .map(|table_info| (table_info.object_id, table_info.sst_id))
1271        })
1272}
1273
1274pub fn new_sub_level(
1275    sub_level_id: u64,
1276    level_type: PbLevelType,
1277    table_infos: Vec<SstableInfo>,
1278) -> Level {
1279    if level_type == PbLevelType::Nonoverlapping {
1280        debug_assert!(
1281            can_concat(&table_infos),
1282            "sst of non-overlapping level is not concat-able: {:?}",
1283            table_infos
1284        );
1285    }
1286    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1287    let uncompressed_file_size = table_infos
1288        .iter()
1289        .map(|table| table.uncompressed_file_size)
1290        .sum();
1291    Level {
1292        level_idx: 0,
1293        level_type,
1294        table_infos,
1295        total_file_size,
1296        sub_level_id,
1297        uncompressed_file_size,
1298        vnode_partition_count: 0,
1299    }
1300}
1301
1302pub fn add_ssts_to_sub_level(
1303    l0: &mut OverlappingLevel,
1304    sub_level_idx: usize,
1305    insert_table_infos: Vec<SstableInfo>,
1306) {
1307    insert_table_infos.iter().for_each(|sst| {
1308        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1309        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1310        l0.total_file_size += sst.sst_size;
1311        l0.uncompressed_file_size += sst.uncompressed_file_size;
1312    });
1313    l0.sub_levels[sub_level_idx]
1314        .table_infos
1315        .extend(insert_table_infos);
1316    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1317        l0.sub_levels[sub_level_idx]
1318            .table_infos
1319            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1320        assert!(
1321            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1322            "sstable ids: {:?}",
1323            l0.sub_levels[sub_level_idx]
1324                .table_infos
1325                .iter()
1326                .map(|sst| sst.sst_id)
1327                .collect_vec()
1328        );
1329    }
1330}
1331
1332/// `None` value of `sub_level_insert_hint` means append.
1333pub fn insert_new_sub_level(
1334    l0: &mut OverlappingLevel,
1335    insert_sub_level_id: u64,
1336    level_type: PbLevelType,
1337    insert_table_infos: Vec<SstableInfo>,
1338    sub_level_insert_hint: Option<usize>,
1339) {
1340    if insert_sub_level_id == u64::MAX {
1341        return;
1342    }
1343    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1344        insert_pos
1345    } else {
1346        if let Some(newest_level) = l0.sub_levels.last() {
1347            assert!(
1348                newest_level.sub_level_id < insert_sub_level_id,
1349                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1350                newest_level.sub_level_id,
1351                insert_sub_level_id,
1352                l0,
1353            );
1354        }
1355        l0.sub_levels.len()
1356    };
1357    #[cfg(debug_assertions)]
1358    {
1359        if insert_pos > 0
1360            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1361        {
1362            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1363        }
1364        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1365            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1366        }
1367    }
1368    // All files will be committed in one new Overlapping sub-level and become
1369    // Nonoverlapping  after at least one compaction.
1370    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1371    l0.total_file_size += level.total_file_size;
1372    l0.uncompressed_file_size += level.uncompressed_file_size;
1373    l0.sub_levels.insert(insert_pos, level);
1374}
1375
1376impl Level {
1377    fn recompute_size(&mut self) {
1378        self.total_file_size = self
1379            .table_infos
1380            .iter()
1381            .map(|table| table.sst_size)
1382            .sum::<u64>();
1383        self.uncompressed_file_size = self
1384            .table_infos
1385            .iter()
1386            .map(|table| table.uncompressed_file_size)
1387            .sum::<u64>();
1388    }
1389
1390    /// Remove SSTs with empty `table_ids`, then recompute sizes.
1391    fn normalize(&mut self) {
1392        self.table_infos
1393            .retain(|sst_info| !sst_info.table_ids.is_empty());
1394        self.recompute_size();
1395    }
1396
1397    /// Return `true` if any SST was actually removed.
1398    fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1399        let original_len = self.table_infos.len();
1400        self.table_infos
1401            .retain(|table| !ids.contains(&table.sst_id));
1402        self.recompute_size();
1403        original_len != self.table_infos.len()
1404    }
1405
1406    /// Prune specified `table_ids` from each SST's metadata,
1407    /// remove SSTs that become empty, then recompute sizes.
1408    fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1409        for sstable_info in &mut self.table_infos {
1410            if !sstable_info
1411                .table_ids
1412                .iter()
1413                .any(|table_id| table_ids.contains(table_id))
1414            {
1415                continue;
1416            }
1417
1418            let mut inner = sstable_info.get_inner();
1419            inner.table_ids.retain(|id| !table_ids.contains(id));
1420            sstable_info.set_inner(inner);
1421        }
1422        self.normalize();
1423    }
1424}
1425
1426impl OverlappingLevel {
1427    /// Remove empty sub-levels, then recompute aggregated sizes.
1428    fn normalize(&mut self) {
1429        self.sub_levels
1430            .retain(|level| !level.table_infos.is_empty());
1431        self.total_file_size = self
1432            .sub_levels
1433            .iter()
1434            .map(|level| level.total_file_size)
1435            .sum::<u64>();
1436        self.uncompressed_file_size = self
1437            .sub_levels
1438            .iter()
1439            .map(|level| level.uncompressed_file_size)
1440            .sum::<u64>();
1441    }
1442}
1443
1444fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1445    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1446        format!(
1447            "sstable ids: {:?}",
1448            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1449        )
1450    }
1451    operand.total_file_size += insert_table_infos
1452        .iter()
1453        .map(|sst| sst.sst_size)
1454        .sum::<u64>();
1455    operand.uncompressed_file_size += insert_table_infos
1456        .iter()
1457        .map(|sst| sst.uncompressed_file_size)
1458        .sum::<u64>();
1459    if operand.level_type == PbLevelType::Overlapping {
1460        operand.level_type = PbLevelType::Nonoverlapping;
1461        operand
1462            .table_infos
1463            .extend(insert_table_infos.iter().cloned());
1464        operand
1465            .table_infos
1466            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1467        assert!(
1468            can_concat(&operand.table_infos),
1469            "{}",
1470            display_sstable_infos(&operand.table_infos)
1471        );
1472    } else if !insert_table_infos.is_empty() {
1473        let sorted_insert: Vec<_> = insert_table_infos
1474            .iter()
1475            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1476            .cloned()
1477            .collect();
1478        let first = &sorted_insert[0];
1479        let last = &sorted_insert[sorted_insert.len() - 1];
1480        let pos = operand
1481            .table_infos
1482            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1483        if pos >= operand.table_infos.len()
1484            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1485        {
1486            operand.table_infos.splice(pos..pos, sorted_insert);
1487            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1488            let validate_range = operand
1489                .table_infos
1490                .iter()
1491                .skip(pos.saturating_sub(1))
1492                .take(insert_table_infos.len() + 2)
1493                .collect_vec();
1494            assert!(
1495                can_concat(&validate_range),
1496                "{}",
1497                display_sstable_infos(&validate_range),
1498            );
1499        } else {
1500            // If this branch is reached, it indicates some unexpected behavior in compaction.
1501            // Here we issue a warning and fall back to insert one by one.
1502            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1503            for i in insert_table_infos {
1504                let pos = operand
1505                    .table_infos
1506                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1507                operand.table_infos.insert(pos, i.clone());
1508            }
1509            assert!(
1510                can_concat(&operand.table_infos),
1511                "{}",
1512                display_sstable_infos(&operand.table_infos)
1513            );
1514        }
1515    }
1516}
1517
1518pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1519    // DO NOT REMOVE THIS LINE
1520    // This is to ensure that when adding new variant to `HummockObjectId`,
1521    // the compiler will warn us if we forget to handle it here.
1522    match HummockObjectId::Sstable(0.into()) {
1523        HummockObjectId::Sstable(_) => {}
1524        HummockObjectId::VectorFile(_) => {}
1525        HummockObjectId::HnswGraphFile(_) => {}
1526    };
1527    version
1528        .levels
1529        .values()
1530        .flat_map(|cg| {
1531            cg.level0()
1532                .sub_levels
1533                .iter()
1534                .chain(cg.levels.iter())
1535                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1536        })
1537        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1538        .chain(
1539            version
1540                .vector_indexes
1541                .values()
1542                .flat_map(|index| index.get_objects()),
1543        )
1544        .collect()
1545}
1546
1547/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1548/// Currently this method is only used by risectl validate-version.
1549pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1550    let mut res = Vec::new();
1551    // Ensure each table maps to only one compaction group
1552    for (group_id, levels) in &version.levels {
1553        // Ensure compaction group id matches
1554        if levels.group_id != *group_id {
1555            res.push(format!(
1556                "GROUP {}: inconsistent group id {} in Levels",
1557                group_id, levels.group_id
1558            ));
1559        }
1560
1561        let validate_level = |group: CompactionGroupId,
1562                              expected_level_idx: u32,
1563                              level: &Level,
1564                              res: &mut Vec<String>| {
1565            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1566            if level.level_idx == 0 {
1567                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1568                // Ensure sub-level is not empty
1569                if level.table_infos.is_empty() {
1570                    res.push(format!("{}: empty level", level_identifier));
1571                }
1572            } else if level.level_type != PbLevelType::Nonoverlapping {
1573                // Ensure non-L0 level is non-overlapping level
1574                res.push(format!(
1575                    "{}: level type {:?} is not non-overlapping",
1576                    level_identifier, level.level_type
1577                ));
1578            }
1579
1580            // Ensure level idx matches
1581            if level.level_idx != expected_level_idx {
1582                res.push(format!(
1583                    "{}: mismatched level idx {}",
1584                    level_identifier, expected_level_idx
1585                ));
1586            }
1587
1588            let mut prev_table_info: Option<&SstableInfo> = None;
1589            for table_info in &level.table_infos {
1590                // Ensure table_ids are sorted and unique
1591                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1592                    res.push(format!(
1593                        "{} SST {}: table_ids not sorted",
1594                        level_identifier, table_info.object_id
1595                    ));
1596                }
1597
1598                // Ensure SSTs in non-overlapping level have non-overlapping key range
1599                if level.level_type == PbLevelType::Nonoverlapping {
1600                    if let Some(prev) = prev_table_info.take()
1601                        && prev
1602                            .key_range
1603                            .compare_right_with(&table_info.key_range.left)
1604                            != Ordering::Less
1605                    {
1606                        res.push(format!(
1607                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1608                            level_identifier, table_info.object_id, prev, table_info
1609                        ));
1610                    }
1611                    let _ = prev_table_info.insert(table_info);
1612                }
1613            }
1614        };
1615
1616        let l0 = &levels.l0;
1617        let mut prev_sub_level_id = u64::MAX;
1618        for sub_level in &l0.sub_levels {
1619            // Ensure sub_level_id is sorted and unique
1620            if sub_level.sub_level_id >= prev_sub_level_id {
1621                res.push(format!(
1622                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1623                    group_id, sub_level.level_idx, prev_sub_level_id
1624                ));
1625            }
1626            prev_sub_level_id = sub_level.sub_level_id;
1627
1628            validate_level(*group_id, 0, sub_level, &mut res);
1629        }
1630
1631        for idx in 1..=levels.levels.len() {
1632            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1633        }
1634    }
1635    res
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640    use std::collections::{HashMap, HashSet};
1641
1642    use bytes::Bytes;
1643    use risingwave_common::catalog::TableId;
1644    use risingwave_common::hash::VirtualNode;
1645    use risingwave_common::util::epoch::test_epoch;
1646    use risingwave_pb::hummock::{
1647        CompactionConfig, GroupConstruct, GroupDestroy, LevelType, StateTableInfo,
1648    };
1649
1650    use super::group_split;
1651    use crate::HummockVersionId;
1652    use crate::compaction_group::group_split::*;
1653    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1654    use crate::key::{FullKey, gen_key_from_str};
1655    use crate::key_range::KeyRange;
1656    use crate::level::{Level, Levels, OverlappingLevel};
1657    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1658    use crate::version::{
1659        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
1660        IntraLevelDelta,
1661    };
1662
1663    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1664        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1665    }
1666
1667    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1668        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1669        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1670        let full_key_l = FullKey::for_test(
1671            TableId::new(*table_ids.first().unwrap()),
1672            table_key_l,
1673            epoch,
1674        )
1675        .encode();
1676        let full_key_r =
1677            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1678                .encode();
1679
1680        SstableInfoInner {
1681            sst_id: sst_id.into(),
1682            key_range: KeyRange {
1683                left: full_key_l.into(),
1684                right: full_key_r.into(),
1685                right_exclusive: false,
1686            },
1687            table_ids: table_ids.into_iter().map(Into::into).collect(),
1688            object_id: sst_id.into(),
1689            min_epoch: 20,
1690            max_epoch: 20,
1691            file_size: 100,
1692            sst_size: 100,
1693            ..Default::default()
1694        }
1695    }
1696
1697    #[test]
1698    fn test_get_sst_object_ids() {
1699        let mut version = HummockVersion {
1700            id: HummockVersionId::new(0),
1701            levels: HashMap::from_iter([(
1702                0.into(),
1703                Levels {
1704                    levels: vec![],
1705                    l0: OverlappingLevel {
1706                        sub_levels: vec![],
1707                        total_file_size: 0,
1708                        uncompressed_file_size: 0,
1709                    },
1710                    ..Default::default()
1711                },
1712            )]),
1713            ..Default::default()
1714        };
1715        assert_eq!(version.get_object_ids().count(), 0);
1716
1717        // Add to sub level
1718        version
1719            .levels
1720            .get_mut(&0)
1721            .unwrap()
1722            .l0
1723            .sub_levels
1724            .push(Level {
1725                table_infos: vec![
1726                    SstableInfoInner {
1727                        object_id: 11.into(),
1728                        sst_id: 11.into(),
1729                        ..Default::default()
1730                    }
1731                    .into(),
1732                ],
1733                ..Default::default()
1734            });
1735        assert_eq!(version.get_object_ids().count(), 1);
1736
1737        // Add to non sub level
1738        version.levels.get_mut(&0).unwrap().levels.push(Level {
1739            table_infos: vec![
1740                SstableInfoInner {
1741                    object_id: 22.into(),
1742                    sst_id: 22.into(),
1743                    ..Default::default()
1744                }
1745                .into(),
1746            ],
1747            ..Default::default()
1748        });
1749        assert_eq!(version.get_object_ids().count(), 2);
1750    }
1751
1752    #[test]
1753    fn test_apply_version_delta() {
1754        let mut version = HummockVersion {
1755            id: HummockVersionId::new(0),
1756            levels: HashMap::from_iter([
1757                (
1758                    0.into(),
1759                    build_initial_compaction_group_levels(
1760                        0,
1761                        &CompactionConfig {
1762                            max_level: 6,
1763                            ..Default::default()
1764                        },
1765                    ),
1766                ),
1767                (
1768                    1.into(),
1769                    build_initial_compaction_group_levels(
1770                        1,
1771                        &CompactionConfig {
1772                            max_level: 6,
1773                            ..Default::default()
1774                        },
1775                    ),
1776                ),
1777            ]),
1778            ..Default::default()
1779        };
1780        let version_delta = HummockVersionDelta {
1781            id: HummockVersionId::new(1),
1782            group_deltas: HashMap::from_iter([
1783                (
1784                    2.into(),
1785                    GroupDeltas {
1786                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1787                            group_config: Some(CompactionConfig {
1788                                max_level: 6,
1789                                ..Default::default()
1790                            }),
1791                            ..Default::default()
1792                        }))],
1793                    },
1794                ),
1795                (
1796                    0.into(),
1797                    GroupDeltas {
1798                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1799                    },
1800                ),
1801                (
1802                    1.into(),
1803                    GroupDeltas {
1804                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1805                            1,
1806                            0,
1807                            HashSet::new(),
1808                            vec![
1809                                SstableInfoInner {
1810                                    object_id: 1.into(),
1811                                    sst_id: 1.into(),
1812                                    ..Default::default()
1813                                }
1814                                .into(),
1815                            ],
1816                            0,
1817                            version
1818                                .levels
1819                                .get(&1)
1820                                .as_ref()
1821                                .unwrap()
1822                                .compaction_group_version_id,
1823                        ))],
1824                    },
1825                ),
1826            ]),
1827            ..Default::default()
1828        };
1829        let version_delta = version_delta;
1830
1831        version.apply_version_delta(&version_delta);
1832        let mut cg1 = build_initial_compaction_group_levels(
1833            1,
1834            &CompactionConfig {
1835                max_level: 6,
1836                ..Default::default()
1837            },
1838        );
1839        cg1.levels[0] = Level {
1840            level_idx: 1,
1841            level_type: LevelType::Nonoverlapping,
1842            table_infos: vec![
1843                SstableInfoInner {
1844                    object_id: 1.into(),
1845                    sst_id: 1.into(),
1846                    ..Default::default()
1847                }
1848                .into(),
1849            ],
1850            ..Default::default()
1851        };
1852        assert_eq!(
1853            version,
1854            HummockVersion {
1855                id: HummockVersionId::new(1),
1856                levels: HashMap::from_iter([
1857                    (
1858                        2.into(),
1859                        build_initial_compaction_group_levels(
1860                            2,
1861                            &CompactionConfig {
1862                                max_level: 6,
1863                                ..Default::default()
1864                            },
1865                        ),
1866                    ),
1867                    (1.into(), cg1),
1868                ]),
1869                ..Default::default()
1870            }
1871        );
1872    }
1873
1874    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1875        gen_sst_info_impl(object_id, table_ids, left, right).into()
1876    }
1877
1878    fn gen_sst_info_impl(
1879        object_id: u64,
1880        table_ids: Vec<u32>,
1881        left: Bytes,
1882        right: Bytes,
1883    ) -> SstableInfoInner {
1884        SstableInfoInner {
1885            object_id: object_id.into(),
1886            sst_id: object_id.into(),
1887            key_range: KeyRange {
1888                left,
1889                right,
1890                right_exclusive: false,
1891            },
1892            table_ids: table_ids.into_iter().map(Into::into).collect(),
1893            file_size: 100,
1894            sst_size: 100,
1895            uncompressed_file_size: 100,
1896            ..Default::default()
1897        }
1898    }
1899
1900    #[test]
1901    fn test_merge_levels() {
1902        let mut left_levels = build_initial_compaction_group_levels(
1903            1,
1904            &CompactionConfig {
1905                max_level: 6,
1906                ..Default::default()
1907            },
1908        );
1909
1910        let mut right_levels = build_initial_compaction_group_levels(
1911            2,
1912            &CompactionConfig {
1913                max_level: 6,
1914                ..Default::default()
1915            },
1916        );
1917
1918        left_levels.levels[0] = Level {
1919            level_idx: 1,
1920            level_type: LevelType::Nonoverlapping,
1921            table_infos: vec![
1922                gen_sst_info(
1923                    1,
1924                    vec![3],
1925                    FullKey::for_test(
1926                        TableId::new(3),
1927                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1928                        0,
1929                    )
1930                    .encode()
1931                    .into(),
1932                    FullKey::for_test(
1933                        TableId::new(3),
1934                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1935                        0,
1936                    )
1937                    .encode()
1938                    .into(),
1939                ),
1940                gen_sst_info(
1941                    10,
1942                    vec![3, 4],
1943                    FullKey::for_test(
1944                        TableId::new(3),
1945                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1946                        0,
1947                    )
1948                    .encode()
1949                    .into(),
1950                    FullKey::for_test(
1951                        TableId::new(4),
1952                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1953                        0,
1954                    )
1955                    .encode()
1956                    .into(),
1957                ),
1958                gen_sst_info(
1959                    11,
1960                    vec![4],
1961                    FullKey::for_test(
1962                        TableId::new(4),
1963                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1964                        0,
1965                    )
1966                    .encode()
1967                    .into(),
1968                    FullKey::for_test(
1969                        TableId::new(4),
1970                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1971                        0,
1972                    )
1973                    .encode()
1974                    .into(),
1975                ),
1976            ],
1977            total_file_size: 300,
1978            ..Default::default()
1979        };
1980
1981        left_levels.l0.sub_levels.push(Level {
1982            level_idx: 0,
1983            table_infos: vec![gen_sst_info(
1984                3,
1985                vec![3],
1986                FullKey::for_test(
1987                    TableId::new(3),
1988                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1989                    0,
1990                )
1991                .encode()
1992                .into(),
1993                FullKey::for_test(
1994                    TableId::new(3),
1995                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1996                    0,
1997                )
1998                .encode()
1999                .into(),
2000            )],
2001            sub_level_id: 101,
2002            level_type: LevelType::Overlapping,
2003            total_file_size: 100,
2004            ..Default::default()
2005        });
2006
2007        left_levels.l0.sub_levels.push(Level {
2008            level_idx: 0,
2009            table_infos: vec![gen_sst_info(
2010                3,
2011                vec![3],
2012                FullKey::for_test(
2013                    TableId::new(3),
2014                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2015                    0,
2016                )
2017                .encode()
2018                .into(),
2019                FullKey::for_test(
2020                    TableId::new(3),
2021                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2022                    0,
2023                )
2024                .encode()
2025                .into(),
2026            )],
2027            sub_level_id: 103,
2028            level_type: LevelType::Overlapping,
2029            total_file_size: 100,
2030            ..Default::default()
2031        });
2032
2033        left_levels.l0.sub_levels.push(Level {
2034            level_idx: 0,
2035            table_infos: vec![gen_sst_info(
2036                3,
2037                vec![3],
2038                FullKey::for_test(
2039                    TableId::new(3),
2040                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2041                    0,
2042                )
2043                .encode()
2044                .into(),
2045                FullKey::for_test(
2046                    TableId::new(3),
2047                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2048                    0,
2049                )
2050                .encode()
2051                .into(),
2052            )],
2053            sub_level_id: 105,
2054            level_type: LevelType::Nonoverlapping,
2055            total_file_size: 100,
2056            ..Default::default()
2057        });
2058
2059        right_levels.levels[0] = Level {
2060            level_idx: 1,
2061            level_type: LevelType::Nonoverlapping,
2062            table_infos: vec![
2063                gen_sst_info(
2064                    1,
2065                    vec![5],
2066                    FullKey::for_test(
2067                        TableId::new(5),
2068                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2069                        0,
2070                    )
2071                    .encode()
2072                    .into(),
2073                    FullKey::for_test(
2074                        TableId::new(5),
2075                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2076                        0,
2077                    )
2078                    .encode()
2079                    .into(),
2080                ),
2081                gen_sst_info(
2082                    10,
2083                    vec![5, 6],
2084                    FullKey::for_test(
2085                        TableId::new(5),
2086                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2087                        0,
2088                    )
2089                    .encode()
2090                    .into(),
2091                    FullKey::for_test(
2092                        TableId::new(6),
2093                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2094                        0,
2095                    )
2096                    .encode()
2097                    .into(),
2098                ),
2099                gen_sst_info(
2100                    11,
2101                    vec![6],
2102                    FullKey::for_test(
2103                        TableId::new(6),
2104                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2105                        0,
2106                    )
2107                    .encode()
2108                    .into(),
2109                    FullKey::for_test(
2110                        TableId::new(6),
2111                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2112                        0,
2113                    )
2114                    .encode()
2115                    .into(),
2116                ),
2117            ],
2118            total_file_size: 300,
2119            ..Default::default()
2120        };
2121
2122        right_levels.l0.sub_levels.push(Level {
2123            level_idx: 0,
2124            table_infos: vec![gen_sst_info(
2125                3,
2126                vec![5],
2127                FullKey::for_test(
2128                    TableId::new(5),
2129                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2130                    0,
2131                )
2132                .encode()
2133                .into(),
2134                FullKey::for_test(
2135                    TableId::new(5),
2136                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2137                    0,
2138                )
2139                .encode()
2140                .into(),
2141            )],
2142            sub_level_id: 101,
2143            level_type: LevelType::Overlapping,
2144            total_file_size: 100,
2145            ..Default::default()
2146        });
2147
2148        right_levels.l0.sub_levels.push(Level {
2149            level_idx: 0,
2150            table_infos: vec![gen_sst_info(
2151                5,
2152                vec![5],
2153                FullKey::for_test(
2154                    TableId::new(5),
2155                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2156                    0,
2157                )
2158                .encode()
2159                .into(),
2160                FullKey::for_test(
2161                    TableId::new(5),
2162                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2163                    0,
2164                )
2165                .encode()
2166                .into(),
2167            )],
2168            sub_level_id: 102,
2169            level_type: LevelType::Overlapping,
2170            total_file_size: 100,
2171            ..Default::default()
2172        });
2173
2174        right_levels.l0.sub_levels.push(Level {
2175            level_idx: 0,
2176            table_infos: vec![gen_sst_info(
2177                3,
2178                vec![5],
2179                FullKey::for_test(
2180                    TableId::new(5),
2181                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2182                    0,
2183                )
2184                .encode()
2185                .into(),
2186                FullKey::for_test(
2187                    TableId::new(5),
2188                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2189                    0,
2190                )
2191                .encode()
2192                .into(),
2193            )],
2194            sub_level_id: 103,
2195            level_type: LevelType::Nonoverlapping,
2196            total_file_size: 100,
2197            ..Default::default()
2198        });
2199
2200        {
2201            // test empty
2202            let mut left_levels = Levels::default();
2203            let right_levels = Levels::default();
2204
2205            group_split::merge_levels(&mut left_levels, right_levels);
2206        }
2207
2208        {
2209            // test empty left
2210            let mut left_levels = build_initial_compaction_group_levels(
2211                1,
2212                &CompactionConfig {
2213                    max_level: 6,
2214                    ..Default::default()
2215                },
2216            );
2217            let right_levels = right_levels.clone();
2218
2219            group_split::merge_levels(&mut left_levels, right_levels);
2220
2221            assert!(left_levels.l0.sub_levels.len() == 3);
2222            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2223            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2224            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2225            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2226            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2227            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2228
2229            assert!(left_levels.levels[0].level_idx == 1);
2230            assert_eq!(300, left_levels.levels[0].total_file_size);
2231        }
2232
2233        {
2234            // test empty right
2235            let mut left_levels = left_levels.clone();
2236            let right_levels = build_initial_compaction_group_levels(
2237                2,
2238                &CompactionConfig {
2239                    max_level: 6,
2240                    ..Default::default()
2241                },
2242            );
2243
2244            group_split::merge_levels(&mut left_levels, right_levels);
2245
2246            assert!(left_levels.l0.sub_levels.len() == 3);
2247            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2248            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2249            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2250            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2251            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2252            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2253
2254            assert!(left_levels.levels[0].level_idx == 1);
2255            assert_eq!(300, left_levels.levels[0].total_file_size);
2256        }
2257
2258        {
2259            let mut left_levels = left_levels.clone();
2260            let right_levels = right_levels.clone();
2261
2262            group_split::merge_levels(&mut left_levels, right_levels);
2263
2264            assert!(left_levels.l0.sub_levels.len() == 6);
2265            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2266            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2267            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2268            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2269            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2270            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2271            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2272            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2273            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2274            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2275            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2276            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2277
2278            assert!(left_levels.levels[0].level_idx == 1);
2279            assert_eq!(600, left_levels.levels[0].total_file_size);
2280        }
2281    }
2282
2283    #[test]
2284    fn test_get_split_pos() {
2285        let epoch = test_epoch(1);
2286        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2287        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2288        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2289
2290        let ssts = vec![s1, s2, s3];
2291        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2292
2293        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2294        assert_eq!(1, pos);
2295
2296        let pos = group_split::get_split_pos(&vec![], split_key);
2297        assert_eq!(0, pos);
2298    }
2299
2300    #[test]
2301    fn test_split_sst() {
2302        let epoch = test_epoch(1);
2303        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2304
2305        {
2306            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2307            let origin_sst = sst.clone();
2308            let sst_size = origin_sst.sst_size;
2309            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2310            assert_eq!(SstSplitType::Both, split_type);
2311
2312            let mut new_sst_id = 10.into();
2313            let (origin_sst, branched_sst) = group_split::split_sst(
2314                origin_sst,
2315                &mut new_sst_id,
2316                split_key,
2317                sst_size / 2,
2318                sst_size / 2,
2319            );
2320
2321            let origin_sst = origin_sst.unwrap();
2322            let branched_sst = branched_sst.unwrap();
2323
2324            assert!(origin_sst.key_range.right_exclusive);
2325            assert!(
2326                origin_sst
2327                    .key_range
2328                    .right
2329                    .cmp(&branched_sst.key_range.left)
2330                    .is_le()
2331            );
2332            assert!(origin_sst.table_ids.is_sorted());
2333            assert!(branched_sst.table_ids.is_sorted());
2334            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2335            assert!(branched_sst.sst_size < origin_sst.file_size);
2336            assert_eq!(10, branched_sst.sst_id);
2337            assert_eq!(11, origin_sst.sst_id);
2338            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2339        }
2340
2341        {
2342            // test un-exist table_id
2343            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2344            let origin_sst = sst.clone();
2345            let sst_size = origin_sst.sst_size;
2346            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2347            assert_eq!(SstSplitType::Both, split_type);
2348
2349            let mut new_sst_id = 10.into();
2350            let (origin_sst, branched_sst) = group_split::split_sst(
2351                origin_sst,
2352                &mut new_sst_id,
2353                split_key,
2354                sst_size / 2,
2355                sst_size / 2,
2356            );
2357
2358            let origin_sst = origin_sst.unwrap();
2359            let branched_sst = branched_sst.unwrap();
2360
2361            assert!(origin_sst.key_range.right_exclusive);
2362            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2363            assert!(origin_sst.table_ids.is_sorted());
2364            assert!(branched_sst.table_ids.is_sorted());
2365            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2366            assert!(branched_sst.sst_size < origin_sst.file_size);
2367            assert_eq!(10, branched_sst.sst_id);
2368            assert_eq!(11, origin_sst.sst_id);
2369            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2370        }
2371
2372        {
2373            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2374            let split_type = group_split::need_to_split(&sst, split_key);
2375            assert_eq!(SstSplitType::Left, split_type);
2376        }
2377
2378        {
2379            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2380            let origin_sst = sst.clone();
2381            let split_type = group_split::need_to_split(&origin_sst, split_key);
2382            assert_eq!(SstSplitType::Both, split_type);
2383
2384            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2385            let origin_sst = sst;
2386            let split_type = group_split::need_to_split(&origin_sst, split_key);
2387            assert_eq!(SstSplitType::Right, split_type);
2388        }
2389
2390        {
2391            // test key_range left = right
2392            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2393            sst.key_range.right = sst.key_range.left.clone();
2394            let sst: SstableInfo = sst.into();
2395            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2396            let origin_sst = sst;
2397            let sst_size = origin_sst.sst_size;
2398
2399            let mut new_sst_id = 10.into();
2400            let (origin_sst, branched_sst) = group_split::split_sst(
2401                origin_sst,
2402                &mut new_sst_id,
2403                split_key,
2404                sst_size / 2,
2405                sst_size / 2,
2406            );
2407
2408            assert!(origin_sst.is_none());
2409            assert!(branched_sst.is_some());
2410        }
2411    }
2412
2413    #[test]
2414    fn test_split_sst_info_for_level() {
2415        let mut version = HummockVersion {
2416            id: HummockVersionId::new(0),
2417            levels: HashMap::from_iter([(
2418                1.into(),
2419                build_initial_compaction_group_levels(
2420                    1,
2421                    &CompactionConfig {
2422                        max_level: 6,
2423                        ..Default::default()
2424                    },
2425                ),
2426            )]),
2427            ..Default::default()
2428        };
2429
2430        let cg1 = version.levels.get_mut(&1).unwrap();
2431
2432        cg1.levels[0] = Level {
2433            level_idx: 1,
2434            level_type: LevelType::Nonoverlapping,
2435            table_infos: vec![
2436                gen_sst_info(
2437                    1,
2438                    vec![3],
2439                    FullKey::for_test(
2440                        TableId::new(3),
2441                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2442                        0,
2443                    )
2444                    .encode()
2445                    .into(),
2446                    FullKey::for_test(
2447                        TableId::new(3),
2448                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2449                        0,
2450                    )
2451                    .encode()
2452                    .into(),
2453                ),
2454                gen_sst_info(
2455                    10,
2456                    vec![3, 4],
2457                    FullKey::for_test(
2458                        TableId::new(3),
2459                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2460                        0,
2461                    )
2462                    .encode()
2463                    .into(),
2464                    FullKey::for_test(
2465                        TableId::new(4),
2466                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2467                        0,
2468                    )
2469                    .encode()
2470                    .into(),
2471                ),
2472                gen_sst_info(
2473                    11,
2474                    vec![4],
2475                    FullKey::for_test(
2476                        TableId::new(4),
2477                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2478                        0,
2479                    )
2480                    .encode()
2481                    .into(),
2482                    FullKey::for_test(
2483                        TableId::new(4),
2484                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2485                        0,
2486                    )
2487                    .encode()
2488                    .into(),
2489                ),
2490            ],
2491            total_file_size: 300,
2492            ..Default::default()
2493        };
2494
2495        cg1.l0.sub_levels.push(Level {
2496            level_idx: 0,
2497            table_infos: vec![
2498                gen_sst_info(
2499                    2,
2500                    vec![2],
2501                    FullKey::for_test(
2502                        TableId::new(0),
2503                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2504                        0,
2505                    )
2506                    .encode()
2507                    .into(),
2508                    FullKey::for_test(
2509                        TableId::new(2),
2510                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2511                        0,
2512                    )
2513                    .encode()
2514                    .into(),
2515                ),
2516                gen_sst_info(
2517                    22,
2518                    vec![2],
2519                    FullKey::for_test(
2520                        TableId::new(0),
2521                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2522                        0,
2523                    )
2524                    .encode()
2525                    .into(),
2526                    FullKey::for_test(
2527                        TableId::new(2),
2528                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2529                        0,
2530                    )
2531                    .encode()
2532                    .into(),
2533                ),
2534                gen_sst_info(
2535                    23,
2536                    vec![2],
2537                    FullKey::for_test(
2538                        TableId::new(0),
2539                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2540                        0,
2541                    )
2542                    .encode()
2543                    .into(),
2544                    FullKey::for_test(
2545                        TableId::new(2),
2546                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2547                        0,
2548                    )
2549                    .encode()
2550                    .into(),
2551                ),
2552                gen_sst_info(
2553                    24,
2554                    vec![2],
2555                    FullKey::for_test(
2556                        TableId::new(2),
2557                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2558                        0,
2559                    )
2560                    .encode()
2561                    .into(),
2562                    FullKey::for_test(
2563                        TableId::new(2),
2564                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2565                        0,
2566                    )
2567                    .encode()
2568                    .into(),
2569                ),
2570                gen_sst_info(
2571                    25,
2572                    vec![2],
2573                    FullKey::for_test(
2574                        TableId::new(0),
2575                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2576                        0,
2577                    )
2578                    .encode()
2579                    .into(),
2580                    FullKey::for_test(
2581                        TableId::new(0),
2582                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2583                        0,
2584                    )
2585                    .encode()
2586                    .into(),
2587                ),
2588            ],
2589            sub_level_id: 101,
2590            level_type: LevelType::Overlapping,
2591            total_file_size: 300,
2592            ..Default::default()
2593        });
2594
2595        {
2596            // split Overlapping level
2597            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2598
2599            let mut new_sst_id = 100.into();
2600            let x = group_split::split_sst_info_for_level_v2(
2601                &mut cg1.l0.sub_levels[0],
2602                &mut new_sst_id,
2603                split_key,
2604            );
2605            // assert_eq!(3, x.len());
2606            // assert_eq!(100, x[0].sst_id);
2607            // assert_eq!(100, x[0].sst_size);
2608            // assert_eq!(101, x[1].sst_id);
2609            // assert_eq!(100, x[1].sst_size);
2610            // assert_eq!(102, x[2].sst_id);
2611            // assert_eq!(100, x[2].sst_size);
2612
2613            let mut right_l0 = OverlappingLevel {
2614                sub_levels: vec![],
2615                total_file_size: 0,
2616                uncompressed_file_size: 0,
2617            };
2618
2619            right_l0.sub_levels.push(Level {
2620                level_idx: 0,
2621                table_infos: x,
2622                sub_level_id: 101,
2623                total_file_size: 100,
2624                level_type: LevelType::Overlapping,
2625                ..Default::default()
2626            });
2627
2628            let right_levels = Levels {
2629                levels: vec![],
2630                l0: right_l0,
2631                ..Default::default()
2632            };
2633
2634            merge_levels(cg1, right_levels);
2635        }
2636
2637        {
2638            // test split empty level
2639            let mut new_sst_id = 100.into();
2640            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2641            let x = group_split::split_sst_info_for_level_v2(
2642                &mut cg1.levels[2],
2643                &mut new_sst_id,
2644                split_key,
2645            );
2646
2647            assert!(x.is_empty());
2648        }
2649
2650        {
2651            // test split to right Nonoverlapping level
2652            let mut cg1 = cg1.clone();
2653            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2654
2655            let mut new_sst_id = 100.into();
2656            let x = group_split::split_sst_info_for_level_v2(
2657                &mut cg1.levels[0],
2658                &mut new_sst_id,
2659                split_key,
2660            );
2661
2662            assert_eq!(3, x.len());
2663            assert_eq!(1, x[0].sst_id);
2664            assert_eq!(100, x[0].sst_size);
2665            assert_eq!(10, x[1].sst_id);
2666            assert_eq!(100, x[1].sst_size);
2667            assert_eq!(11, x[2].sst_id);
2668            assert_eq!(100, x[2].sst_size);
2669
2670            assert_eq!(0, cg1.levels[0].table_infos.len());
2671        }
2672
2673        {
2674            // test split to left Nonoverlapping level
2675            let mut cg1 = cg1.clone();
2676            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2677
2678            let mut new_sst_id = 100.into();
2679            let x = group_split::split_sst_info_for_level_v2(
2680                &mut cg1.levels[0],
2681                &mut new_sst_id,
2682                split_key,
2683            );
2684
2685            assert_eq!(0, x.len());
2686            assert_eq!(3, cg1.levels[0].table_infos.len());
2687        }
2688
2689        // {
2690        //     // test split to both Nonoverlapping level
2691        //     let mut cg1 = cg1.clone();
2692        //     let split_key = build_split_key(3, VirtualNode::MAX);
2693
2694        //     let mut new_sst_id = 100;
2695        //     let x = group_split::split_sst_info_for_level_v2(
2696        //         &mut cg1.levels[0],
2697        //         &mut new_sst_id,
2698        //         split_key,
2699        //     );
2700
2701        //     assert_eq!(2, x.len());
2702        //     assert_eq!(100, x[0].sst_id);
2703        //     assert_eq!(100 / 2, x[0].sst_size);
2704        //     assert_eq!(11, x[1].sst_id);
2705        //     assert_eq!(100, x[1].sst_size);
2706        //     assert_eq!(vec![3, 4], x[0].table_ids);
2707
2708        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2709        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2710        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2711        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2712        // }
2713
2714        {
2715            // test split to both Nonoverlapping level
2716            let mut cg1 = cg1.clone();
2717            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2718
2719            let mut new_sst_id = 100.into();
2720            let x = group_split::split_sst_info_for_level_v2(
2721                &mut cg1.levels[0],
2722                &mut new_sst_id,
2723                split_key,
2724            );
2725
2726            assert_eq!(2, x.len());
2727            assert_eq!(100, x[0].sst_id);
2728            assert_eq!(100 / 2, x[0].sst_size);
2729            assert_eq!(11, x[1].sst_id);
2730            assert_eq!(100, x[1].sst_size);
2731            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2732
2733            assert_eq!(2, cg1.levels[0].table_infos.len());
2734            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2735            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2736            assert_eq!(
2737                vec![TableId::new(3)],
2738                cg1.levels[0].table_infos[1].table_ids
2739            );
2740        }
2741    }
2742
2743    fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2744        SstableInfoInner {
2745            sst_id: sst_id.into(),
2746            object_id: sst_id.into(),
2747            table_ids: table_ids.into_iter().map(TableId::new).collect(),
2748            file_size: sst_size,
2749            sst_size,
2750            uncompressed_file_size: sst_size * 2,
2751            ..Default::default()
2752        }
2753        .into()
2754    }
2755
2756    #[test]
2757    fn test_level_normalize() {
2758        // Mixed: some SSTs empty, some not.
2759        let mut level = Level {
2760            level_idx: 1,
2761            level_type: LevelType::Nonoverlapping,
2762            table_infos: vec![
2763                make_sst(1, vec![1, 2], 100),
2764                make_sst(2, vec![], 200), // empty → removed
2765                make_sst(3, vec![3], 300),
2766                make_sst(4, vec![], 400), // empty → removed
2767            ],
2768            total_file_size: 9999, // intentionally wrong
2769            uncompressed_file_size: 9999,
2770            ..Default::default()
2771        };
2772
2773        level.normalize();
2774
2775        assert_eq!(level.table_infos.len(), 2);
2776        assert_eq!(1, level.table_infos[0].sst_id);
2777        assert_eq!(3, level.table_infos[1].sst_id);
2778        assert_eq!(level.total_file_size, 400);
2779        assert_eq!(level.uncompressed_file_size, 800);
2780
2781        // No empty SSTs: just recompute sizes.
2782        level.total_file_size = 0;
2783        level.uncompressed_file_size = 0;
2784        level.normalize();
2785        assert_eq!(level.table_infos.len(), 2);
2786        assert_eq!(level.total_file_size, 400);
2787        assert_eq!(level.uncompressed_file_size, 800);
2788
2789        // All empty → level becomes empty.
2790        level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2791        level.normalize();
2792        assert!(level.table_infos.is_empty());
2793        assert_eq!(level.total_file_size, 0);
2794        assert_eq!(level.uncompressed_file_size, 0);
2795    }
2796
2797    #[test]
2798    fn test_level_delete_ssts() {
2799        let mut level = Level {
2800            level_idx: 1,
2801            level_type: LevelType::Nonoverlapping,
2802            table_infos: vec![
2803                make_sst(1, vec![1], 100),
2804                make_sst(2, vec![2], 200),
2805                make_sst(3, vec![3], 300),
2806            ],
2807            total_file_size: 600,
2808            uncompressed_file_size: 1200,
2809            ..Default::default()
2810        };
2811
2812        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2813        let changed = level.delete_ssts(&delete_ids);
2814
2815        assert!(changed);
2816        assert_eq!(level.table_infos.len(), 2);
2817        assert_eq!(1, level.table_infos[0].sst_id);
2818        assert_eq!(3, level.table_infos[1].sst_id);
2819        assert_eq!(level.total_file_size, 400);
2820        assert_eq!(level.uncompressed_file_size, 800);
2821
2822        // Delete non-existent id → no change, returns false.
2823        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2824        let changed = level.delete_ssts(&delete_ids);
2825        assert!(!changed);
2826        assert_eq!(level.table_infos.len(), 2);
2827    }
2828
2829    #[test]
2830    fn test_level_prune_table_ids_from_ssts() {
2831        let mut level = Level {
2832            level_idx: 1,
2833            level_type: LevelType::Nonoverlapping,
2834            table_infos: vec![
2835                make_sst(1, vec![1, 2], 100),
2836                make_sst(2, vec![2, 3], 200),
2837                make_sst(3, vec![2], 300),
2838            ],
2839            total_file_size: 600,
2840            uncompressed_file_size: 1200,
2841            ..Default::default()
2842        };
2843
2844        let pruned_table_ids = HashSet::from([TableId::new(2)]);
2845        level.prune_table_ids_from_ssts(&pruned_table_ids);
2846
2847        // SST 3 should be removed (was only table_id=2)
2848        assert_eq!(level.table_infos.len(), 2);
2849        assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2850        assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2851        assert_eq!(level.total_file_size, 100 + 200);
2852        assert_eq!(level.uncompressed_file_size, 200 + 400);
2853
2854        // Prune remaining tables, so all SSTs are removed.
2855        let pruned_table_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2856        level.prune_table_ids_from_ssts(&pruned_table_ids);
2857        assert!(level.table_infos.is_empty());
2858        assert_eq!(level.total_file_size, 0);
2859        assert_eq!(level.uncompressed_file_size, 0);
2860    }
2861
2862    #[test]
2863    fn test_overlapping_level_normalize() {
2864        let mut l0 = OverlappingLevel {
2865            sub_levels: vec![
2866                Level {
2867                    level_idx: 0,
2868                    table_infos: vec![make_sst(1, vec![1], 100)],
2869                    total_file_size: 100,
2870                    uncompressed_file_size: 200,
2871                    sub_level_id: 1,
2872                    ..Default::default()
2873                },
2874                Level {
2875                    level_idx: 0,
2876                    table_infos: vec![], // empty → should be removed
2877                    total_file_size: 0,
2878                    uncompressed_file_size: 0,
2879                    sub_level_id: 2,
2880                    ..Default::default()
2881                },
2882                Level {
2883                    level_idx: 0,
2884                    table_infos: vec![make_sst(3, vec![3], 300)],
2885                    total_file_size: 300,
2886                    uncompressed_file_size: 600,
2887                    sub_level_id: 3,
2888                    ..Default::default()
2889                },
2890            ],
2891            total_file_size: 9999, // intentionally wrong
2892            uncompressed_file_size: 9999,
2893        };
2894
2895        l0.normalize();
2896
2897        assert_eq!(l0.sub_levels.len(), 2);
2898        assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2899        assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2900        assert_eq!(l0.total_file_size, 100 + 300);
2901        assert_eq!(l0.uncompressed_file_size, 200 + 600);
2902
2903        // All sub-levels empty → normalize clears everything.
2904        l0.sub_levels = vec![Level {
2905            level_idx: 0,
2906            table_infos: vec![],
2907            ..Default::default()
2908        }];
2909        l0.normalize();
2910        assert!(l0.sub_levels.is_empty());
2911        assert_eq!(l0.total_file_size, 0);
2912        assert_eq!(l0.uncompressed_file_size, 0);
2913    }
2914
2915    #[test]
2916    fn test_levels_prune_table_ids_from_ssts() {
2917        #[expect(deprecated)]
2918        let mut levels = Levels {
2919            l0: OverlappingLevel {
2920                sub_levels: vec![
2921                    Level {
2922                        level_idx: 0,
2923                        table_infos: vec![
2924                            make_sst(1, vec![10], 100), // table 10
2925                            make_sst(2, vec![20], 200), // table 20
2926                        ],
2927                        total_file_size: 300,
2928                        uncompressed_file_size: 600,
2929                        sub_level_id: 1,
2930                        ..Default::default()
2931                    },
2932                    Level {
2933                        level_idx: 0,
2934                        table_infos: vec![
2935                            make_sst(3, vec![10], 150), // table 10
2936                        ],
2937                        total_file_size: 150,
2938                        uncompressed_file_size: 300,
2939                        sub_level_id: 2,
2940                        ..Default::default()
2941                    },
2942                ],
2943                total_file_size: 450,
2944                uncompressed_file_size: 900,
2945            },
2946            levels: vec![Level {
2947                level_idx: 1,
2948                level_type: LevelType::Nonoverlapping,
2949                table_infos: vec![
2950                    make_sst(4, vec![10, 20], 400), // shared SST
2951                    make_sst(5, vec![10], 500),     // table 10 only
2952                ],
2953                total_file_size: 900,
2954                uncompressed_file_size: 1800,
2955                ..Default::default()
2956            }],
2957            group_id: 1.into(),
2958            parent_group_id: 0.into(),
2959            member_table_ids: vec![],
2960            compaction_group_version_id: 0,
2961        };
2962
2963        // Prune table 10 from SST metadata.
2964        levels.prune_table_ids_from_ssts(&HashSet::from([TableId::new(10)]));
2965
2966        assert_eq!(levels.l0.sub_levels.len(), 1);
2967        assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2968        assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2969        assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2970        assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2971        assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2972
2973        assert_eq!(levels.l0.total_file_size, 200);
2974        assert_eq!(levels.l0.uncompressed_file_size, 400);
2975
2976        assert_eq!(levels.levels[0].table_infos.len(), 1);
2977        assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2978        assert_eq!(
2979            levels.levels[0].table_infos[0].table_ids,
2980            vec![TableId::new(20)]
2981        );
2982        assert_eq!(levels.levels[0].total_file_size, 400);
2983        assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2984
2985        assert_eq!(levels.compaction_group_version_id, 1);
2986    }
2987
2988    #[test]
2989    fn test_apply_version_delta_prune_table_ids_from_ssts() {
2990        let mut version = HummockVersion {
2991            id: HummockVersionId::new(0),
2992            levels: HashMap::from_iter([(1.into(), {
2993                #[expect(deprecated)]
2994                let levels = Levels {
2995                    l0: OverlappingLevel {
2996                        sub_levels: vec![
2997                            Level {
2998                                level_idx: 0,
2999                                level_type: LevelType::Overlapping,
3000                                table_infos: vec![
3001                                    make_sst(1, vec![100], 50), // only table 100
3002                                    make_sst(2, vec![200], 60), // only table 200
3003                                ],
3004                                total_file_size: 110,
3005                                uncompressed_file_size: 220,
3006                                sub_level_id: 1,
3007                                ..Default::default()
3008                            },
3009                            Level {
3010                                level_idx: 0,
3011                                level_type: LevelType::Overlapping,
3012                                table_infos: vec![
3013                                    make_sst(3, vec![100], 70), // only table 100
3014                                ],
3015                                total_file_size: 70,
3016                                uncompressed_file_size: 140,
3017                                sub_level_id: 2,
3018                                ..Default::default()
3019                            },
3020                        ],
3021                        total_file_size: 180,
3022                        uncompressed_file_size: 360,
3023                    },
3024                    levels: vec![Level {
3025                        level_idx: 1,
3026                        level_type: LevelType::Nonoverlapping,
3027                        table_infos: vec![
3028                            make_sst(4, vec![100, 200], 80), // shared
3029                            make_sst(5, vec![100], 90),      // only table 100
3030                        ],
3031                        total_file_size: 170,
3032                        uncompressed_file_size: 340,
3033                        ..Default::default()
3034                    }],
3035                    group_id: 1.into(),
3036                    parent_group_id: 0.into(),
3037                    member_table_ids: vec![],
3038                    compaction_group_version_id: 0,
3039                };
3040                levels
3041            })]),
3042            ..Default::default()
3043        };
3044
3045        let version_delta = HummockVersionDelta {
3046            id: HummockVersionId::new(1),
3047            group_deltas: HashMap::from_iter([(
3048                1.into(),
3049                GroupDeltas {
3050                    group_deltas: vec![GroupDelta::PruneTableIdsFromSsts(HashSet::from([
3051                        TableId::new(100),
3052                    ]))],
3053                },
3054            )]),
3055            ..Default::default()
3056        };
3057
3058        version.apply_version_delta(&version_delta);
3059
3060        let cg = version.get_compaction_group_levels(1.into());
3061
3062        assert_eq!(
3063            cg.l0.sub_levels.len(),
3064            1,
3065            "empty sub-level should be removed"
3066        );
3067        assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3068        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3069        assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3070        assert_eq!(
3071            cg.l0.sub_levels[0].table_infos[0].table_ids,
3072            vec![TableId::new(200)]
3073        );
3074
3075        assert_eq!(cg.l0.total_file_size, 60);
3076        assert_eq!(cg.l0.uncompressed_file_size, 120);
3077
3078        assert_eq!(cg.levels[0].table_infos.len(), 1);
3079        assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3080        assert_eq!(
3081            cg.levels[0].table_infos[0].table_ids,
3082            vec![TableId::new(200)]
3083        );
3084        assert_eq!(cg.levels[0].total_file_size, 80);
3085        assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3086
3087        assert_eq!(cg.compaction_group_version_id, 1);
3088    }
3089
3090    #[test]
3091    fn test_prune_stale_table_ids_from_ssts() {
3092        let live_table_id = TableId::new(100);
3093        let stale_table_id = TableId::new(200);
3094        let mut version = HummockVersion {
3095            id: HummockVersionId::new(0),
3096            levels: HashMap::from_iter([(1.into(), {
3097                #[expect(deprecated)]
3098                let levels = Levels {
3099                    l0: OverlappingLevel {
3100                        sub_levels: vec![Level {
3101                            level_idx: 0,
3102                            level_type: LevelType::Overlapping,
3103                            table_infos: vec![make_sst(
3104                                1,
3105                                vec![live_table_id.as_raw_id(), stale_table_id.as_raw_id()],
3106                                50,
3107                            )],
3108                            total_file_size: 50,
3109                            uncompressed_file_size: 100,
3110                            sub_level_id: 1,
3111                            ..Default::default()
3112                        }],
3113                        total_file_size: 50,
3114                        uncompressed_file_size: 100,
3115                    },
3116                    levels: vec![Level {
3117                        level_idx: 1,
3118                        level_type: LevelType::Nonoverlapping,
3119                        table_infos: vec![make_sst(2, vec![stale_table_id.as_raw_id()], 60)],
3120                        total_file_size: 60,
3121                        uncompressed_file_size: 120,
3122                        ..Default::default()
3123                    }],
3124                    group_id: 1.into(),
3125                    parent_group_id: 0.into(),
3126                    member_table_ids: vec![],
3127                    compaction_group_version_id: 0,
3128                };
3129                levels
3130            })]),
3131            state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
3132                HashMap::from_iter([(
3133                    live_table_id,
3134                    StateTableInfo {
3135                        committed_epoch: 1,
3136                        compaction_group_id: 1.into(),
3137                    },
3138                )]),
3139            ),
3140            ..Default::default()
3141        };
3142
3143        assert_eq!(version.prune_stale_table_ids_from_ssts(), 1);
3144
3145        let cg = version.get_compaction_group_levels(1.into());
3146        assert_eq!(cg.l0.sub_levels.len(), 1);
3147        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3148        assert_eq!(cg.l0.sub_levels[0].table_infos[0].sst_id, 1);
3149        assert_eq!(
3150            cg.l0.sub_levels[0].table_infos[0].table_ids,
3151            vec![live_table_id]
3152        );
3153        assert!(cg.levels[0].table_infos.is_empty());
3154        assert_eq!(cg.compaction_group_version_id, 1);
3155    }
3156
3157    #[test]
3158    fn test_prune_stale_table_ids_from_ssts_skips_legacy_member_table_ids() {
3159        let mut version = HummockVersion {
3160            id: HummockVersionId::new(0),
3161            levels: HashMap::from_iter([(1.into(), {
3162                #[expect(deprecated)]
3163                let levels = Levels {
3164                    l0: OverlappingLevel {
3165                        sub_levels: vec![Level {
3166                            level_idx: 0,
3167                            level_type: LevelType::Overlapping,
3168                            table_infos: vec![make_sst(1, vec![100, 200], 50)],
3169                            total_file_size: 50,
3170                            uncompressed_file_size: 100,
3171                            sub_level_id: 1,
3172                            ..Default::default()
3173                        }],
3174                        total_file_size: 50,
3175                        uncompressed_file_size: 100,
3176                    },
3177                    group_id: 1.into(),
3178                    parent_group_id: 0.into(),
3179                    member_table_ids: vec![100, 200],
3180                    compaction_group_version_id: 0,
3181                    ..Default::default()
3182                };
3183                levels
3184            })]),
3185            ..Default::default()
3186        };
3187
3188        assert_eq!(version.prune_stale_table_ids_from_ssts(), 0);
3189
3190        let cg = version.get_compaction_group_levels(1.into());
3191        assert_eq!(
3192            cg.l0.sub_levels[0].table_infos[0].table_ids,
3193            vec![TableId::new(100), TableId::new(200)]
3194        );
3195        assert_eq!(cg.compaction_group_version_id, 0);
3196    }
3197
3198    #[test]
3199    fn test_apply_version_delta_compact_l0() {
3200        let mut version = HummockVersion {
3201            id: HummockVersionId::new(0),
3202            levels: HashMap::from_iter([(1.into(), {
3203                #[expect(deprecated)]
3204                let levels = Levels {
3205                    l0: OverlappingLevel {
3206                        sub_levels: vec![
3207                            Level {
3208                                level_idx: 0,
3209                                level_type: LevelType::Nonoverlapping,
3210                                table_infos: vec![
3211                                    make_sst(1, vec![1], 100),
3212                                    make_sst(2, vec![2], 200),
3213                                ],
3214                                total_file_size: 300,
3215                                uncompressed_file_size: 600,
3216                                sub_level_id: 1,
3217                                ..Default::default()
3218                            },
3219                            Level {
3220                                level_idx: 0,
3221                                level_type: LevelType::Nonoverlapping,
3222                                table_infos: vec![make_sst(3, vec![3], 300)],
3223                                total_file_size: 300,
3224                                uncompressed_file_size: 600,
3225                                sub_level_id: 2,
3226                                ..Default::default()
3227                            },
3228                        ],
3229                        total_file_size: 600,
3230                        uncompressed_file_size: 1200,
3231                    },
3232                    levels: vec![Level {
3233                        level_idx: 1,
3234                        level_type: LevelType::Nonoverlapping,
3235                        table_infos: vec![],
3236                        total_file_size: 0,
3237                        uncompressed_file_size: 0,
3238                        ..Default::default()
3239                    }],
3240                    group_id: 1.into(),
3241                    parent_group_id: 0.into(),
3242                    member_table_ids: vec![],
3243                    compaction_group_version_id: 0,
3244                };
3245                levels
3246            })]),
3247            ..Default::default()
3248        };
3249
3250        let version_delta = HummockVersionDelta {
3251            id: HummockVersionId::new(1),
3252            group_deltas: HashMap::from_iter([(
3253                1.into(),
3254                GroupDeltas {
3255                    group_deltas: vec![
3256                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3257                            0, // L0
3258                            0,
3259                            HashSet::from([1.into(), 2.into(), 3.into()]),
3260                            vec![],
3261                            0,
3262                            0,
3263                        )),
3264                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3265                            1, // L1
3266                            0,
3267                            HashSet::new(),
3268                            vec![make_sst(10, vec![1, 2, 3], 500)],
3269                            0,
3270                            0,
3271                        )),
3272                    ],
3273                },
3274            )]),
3275            ..Default::default()
3276        };
3277
3278        version.apply_version_delta(&version_delta);
3279
3280        let cg = version.get_compaction_group_levels(1.into());
3281
3282        assert!(cg.l0.sub_levels.is_empty());
3283        assert_eq!(cg.l0.total_file_size, 0);
3284        assert_eq!(cg.l0.uncompressed_file_size, 0);
3285
3286        assert_eq!(cg.levels[0].table_infos.len(), 1);
3287        assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3288        assert_eq!(cg.levels[0].total_file_size, 500);
3289    }
3290}