risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52    pub insert_sst_level: u32,
53    pub insert_sst_infos: Vec<SstableInfo>,
54    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61        self.levels
62            .get(&compaction_group_id)
63            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64    }
65
66    pub fn get_compaction_group_levels_mut(
67        &mut self,
68        compaction_group_id: CompactionGroupId,
69    ) -> &mut Levels {
70        self.levels
71            .get_mut(&compaction_group_id)
72            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73    }
74
75    // only scan the sst infos from levels in the specified compaction group (without table change log)
76    pub fn get_sst_ids_by_group_id(
77        &self,
78        compaction_group_id: CompactionGroupId,
79    ) -> impl Iterator<Item = HummockSstableId> + '_ {
80        self.levels
81            .iter()
82            .filter_map(move |(cg_id, level)| {
83                if *cg_id == compaction_group_id {
84                    Some(level)
85                } else {
86                    None
87                }
88            })
89            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90            .flat_map(|level| level.table_infos.iter())
91            .map(|s| s.sst_id)
92    }
93
94    /// Prune stale table ids that no longer exist in `state_table_info` from SST metadata.
95    ///
96    /// This is used to normalize recovered versions from old clusters where dropped table ids
97    /// may still exist in persisted SST metadata.
98    pub fn prune_stale_table_ids_from_ssts(&mut self) -> usize {
99        let live_table_ids: HashSet<_> = self.state_table_info.info().keys().copied().collect();
100        // Older checkpoints may rely on deprecated `member_table_ids` before `state_table_info`
101        // is backfilled.
102        if live_table_ids.is_empty()
103            && self.levels.values().any(|levels| {
104                #[expect(deprecated)]
105                {
106                    !levels.member_table_ids.is_empty()
107                }
108            })
109        {
110            return 0;
111        }
112
113        let mut pruned_table_ids = HashSet::new();
114
115        for levels in self.levels.values_mut() {
116            let stale_table_ids = levels
117                .l0
118                .sub_levels
119                .iter()
120                .chain(levels.levels.iter())
121                .flat_map(|level| level.table_infos.iter())
122                .flat_map(|sst| sst.table_ids.iter().copied())
123                .filter(|table_id| !live_table_ids.contains(table_id))
124                .collect::<HashSet<_>>();
125
126            if stale_table_ids.is_empty() {
127                continue;
128            }
129
130            pruned_table_ids.extend(stale_table_ids.iter().copied());
131            levels.prune_table_ids_from_ssts(&stale_table_ids);
132        }
133
134        pruned_table_ids.len()
135    }
136
137    pub fn level_iter<F: FnMut(&Level) -> bool>(
138        &self,
139        compaction_group_id: CompactionGroupId,
140        mut f: F,
141    ) {
142        if let Some(levels) = self.levels.get(&compaction_group_id) {
143            for sub_level in &levels.l0.sub_levels {
144                if !f(sub_level) {
145                    return;
146                }
147            }
148            for level in &levels.levels {
149                if !f(level) {
150                    return;
151                }
152            }
153        }
154    }
155
156    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
157        // l0 is currently separated from all levels
158        self.levels
159            .get(&compaction_group_id)
160            .map(|group| group.levels.len() + 1)
161            .unwrap_or(0)
162    }
163
164    pub fn safe_epoch_table_watermarks(
165        &self,
166        existing_table_ids: &[TableId],
167    ) -> BTreeMap<TableId, TableWatermarks> {
168        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
169    }
170}
171
172pub fn safe_epoch_table_watermarks_impl(
173    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
174    existing_table_ids: &[TableId],
175) -> BTreeMap<TableId, TableWatermarks> {
176    fn extract_single_table_watermark(
177        table_watermarks: &TableWatermarks,
178    ) -> Option<TableWatermarks> {
179        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
180            Some(TableWatermarks {
181                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
182                direction: table_watermarks.direction,
183                watermark_type: table_watermarks.watermark_type,
184            })
185        } else {
186            None
187        }
188    }
189    table_watermarks
190        .iter()
191        .filter_map(|(table_id, table_watermarks)| {
192            if !existing_table_ids.contains(table_id) {
193                None
194            } else {
195                extract_single_table_watermark(table_watermarks)
196                    .map(|table_watermarks| (*table_id, table_watermarks))
197            }
198        })
199        .collect()
200}
201
202pub fn safe_epoch_read_table_watermarks_impl(
203    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
204) -> BTreeMap<TableId, ReadTableWatermark> {
205    safe_epoch_watermarks
206        .into_iter()
207        .map(|(table_id, watermarks)| {
208            assert_eq!(watermarks.watermarks.len(), 1);
209            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
210            let mut vnode_watermark_map = BTreeMap::new();
211            for vnode_watermark in vnode_watermarks.iter() {
212                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
213                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
214                    assert!(
215                        vnode_watermark_map
216                            .insert(vnode, watermark.clone())
217                            .is_none(),
218                        "duplicate table watermark on vnode {}",
219                        vnode.to_index()
220                    );
221                }
222            }
223            (
224                table_id,
225                ReadTableWatermark {
226                    direction: watermarks.direction,
227                    vnode_watermarks: vnode_watermark_map,
228                },
229            )
230        })
231        .collect()
232}
233
234impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
235    pub fn count_new_ssts_in_group_split(
236        &self,
237        parent_group_id: CompactionGroupId,
238        split_key: Bytes,
239    ) -> u64 {
240        self.levels
241            .get(&parent_group_id)
242            .map_or(0, |parent_levels| {
243                let l0 = &parent_levels.l0;
244                let mut split_count = 0;
245                for sub_level in &l0.sub_levels {
246                    assert!(!sub_level.table_infos.is_empty());
247
248                    if sub_level.level_type == PbLevelType::Overlapping {
249                        // TODO: use table_id / vnode / key_range filter
250                        split_count += sub_level
251                            .table_infos
252                            .iter()
253                            .map(|sst| {
254                                if let group_split::SstSplitType::Both =
255                                    group_split::need_to_split(sst, split_key.clone())
256                                {
257                                    2
258                                } else {
259                                    0
260                                }
261                            })
262                            .sum::<u64>();
263                        continue;
264                    }
265
266                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
267                    let sst = sub_level.table_infos.get(pos).unwrap();
268
269                    if let group_split::SstSplitType::Both =
270                        group_split::need_to_split(sst, split_key.clone())
271                    {
272                        split_count += 2;
273                    }
274                }
275
276                for level in &parent_levels.levels {
277                    if level.table_infos.is_empty() {
278                        continue;
279                    }
280                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
281                    let sst = level.table_infos.get(pos).unwrap();
282                    if let group_split::SstSplitType::Both =
283                        group_split::need_to_split(sst, split_key.clone())
284                    {
285                        split_count += 2;
286                    }
287                }
288
289                split_count
290            })
291    }
292
293    pub fn init_with_parent_group(
294        &mut self,
295        parent_group_id: CompactionGroupId,
296        group_id: CompactionGroupId,
297        member_table_ids: BTreeSet<StateTableId>,
298        new_sst_start_id: HummockSstableId,
299    ) {
300        let mut new_sst_id = new_sst_start_id;
301        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
302            if new_sst_start_id != 0 {
303                if cfg!(debug_assertions) {
304                    panic!(
305                        "non-zero sst start id {} for NewCompactionGroup",
306                        new_sst_start_id
307                    );
308                } else {
309                    warn!(
310                        %new_sst_start_id,
311                        "non-zero sst start id for NewCompactionGroup"
312                    );
313                }
314            }
315            return;
316        } else if !self.levels.contains_key(&parent_group_id) {
317            unreachable!(
318                "non-existing parent group id {} to init from",
319                parent_group_id
320            );
321        }
322        let [parent_levels, cur_levels] = self
323            .levels
324            .get_disjoint_mut([&parent_group_id, &group_id])
325            .map(|res| res.unwrap());
326        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
327        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
328        parent_levels.compaction_group_version_id += 1;
329        cur_levels.compaction_group_version_id += 1;
330        let l0 = &mut parent_levels.l0;
331        {
332            for sub_level in &mut l0.sub_levels {
333                let target_l0 = &mut cur_levels.l0;
334                // Remove SST from sub level may result in empty sub level. It will be purged
335                // whenever another compaction task is finished.
336                let insert_table_infos =
337                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
338                sub_level.normalize();
339                if insert_table_infos.is_empty() {
340                    continue;
341                }
342                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
343                    Ok(idx) => {
344                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
345                    }
346                    Err(idx) => {
347                        insert_new_sub_level(
348                            target_l0,
349                            sub_level.sub_level_id,
350                            sub_level.level_type,
351                            insert_table_infos,
352                            Some(idx),
353                        );
354                    }
355                }
356            }
357            l0.normalize();
358        }
359        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
360            let insert_table_infos =
361                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
362            cur_levels.levels[idx].total_file_size += insert_table_infos
363                .iter()
364                .map(|sst| sst.sst_size)
365                .sum::<u64>();
366            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
367                .iter()
368                .map(|sst| sst.uncompressed_file_size)
369                .sum::<u64>();
370            cur_levels.levels[idx]
371                .table_infos
372                .extend(insert_table_infos);
373            cur_levels.levels[idx]
374                .table_infos
375                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
376            assert!(can_concat(&cur_levels.levels[idx].table_infos));
377            level.normalize();
378        }
379
380        assert!(
381            parent_levels
382                .l0
383                .sub_levels
384                .iter()
385                .all(|level| !level.table_infos.is_empty())
386        );
387        assert!(
388            cur_levels
389                .l0
390                .sub_levels
391                .iter()
392                .all(|level| !level.table_infos.is_empty())
393        );
394    }
395
396    pub fn build_sst_delta_infos(
397        &self,
398        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
399    ) -> Vec<SstDeltaInfo> {
400        let mut infos = vec![];
401
402        // Skip trivial move delta for refiller
403        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
404        if version_delta.trivial_move {
405            return infos;
406        }
407
408        for (group_id, group_deltas) in &version_delta.group_deltas {
409            let mut info = SstDeltaInfo::default();
410
411            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
412            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
413
414            // Build only if all deltas are intra level deltas.
415            if !group_deltas.group_deltas.iter().all(|delta| {
416                matches!(
417                    delta,
418                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
419                )
420            }) {
421                continue;
422            }
423
424            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
425            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
426            // struct to reduce conventions.
427            for group_delta in &group_deltas.group_deltas {
428                match group_delta {
429                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
430                        if !inserted_table_infos.is_empty() {
431                            info.insert_sst_level = 0;
432                            info.insert_sst_infos
433                                .extend(inserted_table_infos.iter().cloned());
434                        }
435                    }
436                    GroupDeltaCommon::IntraLevel(intra_level) => {
437                        if !intra_level.inserted_table_infos.is_empty() {
438                            info.insert_sst_level = intra_level.level_idx;
439                            info.insert_sst_infos
440                                .extend(intra_level.inserted_table_infos.iter().cloned());
441                        }
442                        if !intra_level.removed_table_ids.is_empty() {
443                            for id in &intra_level.removed_table_ids {
444                                if intra_level.level_idx == 0 {
445                                    removed_l0_ssts.insert(*id);
446                                } else {
447                                    removed_ssts
448                                        .entry(intra_level.level_idx)
449                                        .or_default()
450                                        .insert(*id);
451                                }
452                            }
453                        }
454                    }
455                    GroupDeltaCommon::GroupConstruct(_)
456                    | GroupDeltaCommon::GroupDestroy(_)
457                    | GroupDeltaCommon::GroupMerge(_)
458                    | GroupDeltaCommon::PruneTableIdsFromSsts(_) => {}
459                }
460            }
461
462            let group = self.levels.get(group_id).unwrap();
463            for l0_sub_level in &group.level0().sub_levels {
464                for sst_info in &l0_sub_level.table_infos {
465                    if removed_l0_ssts.remove(&sst_info.sst_id) {
466                        info.delete_sst_object_ids.push(sst_info.object_id);
467                    }
468                }
469            }
470            for level in &group.levels {
471                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
472                    for sst_info in &level.table_infos {
473                        if removed_level_ssts.remove(&sst_info.sst_id) {
474                            info.delete_sst_object_ids.push(sst_info.object_id);
475                        }
476                    }
477                    if !removed_level_ssts.is_empty() {
478                        tracing::error!(
479                            "removed_level_ssts is not empty: {:?}",
480                            removed_level_ssts,
481                        );
482                    }
483                    debug_assert!(removed_level_ssts.is_empty());
484                }
485            }
486
487            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
488                tracing::error!(
489                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
490                    removed_l0_ssts,
491                    removed_ssts
492                );
493            }
494            debug_assert!(removed_l0_ssts.is_empty());
495            debug_assert!(removed_ssts.is_empty());
496
497            infos.push(info);
498        }
499
500        infos
501    }
502
503    pub fn apply_version_delta(
504        &mut self,
505        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
506    ) -> HashMap<TableId, Option<StateTableInfo>> {
507        assert_eq!(self.id, version_delta.prev_id);
508
509        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
510            &version_delta.state_table_info_delta,
511            &version_delta.removed_table_ids,
512        );
513
514        #[expect(deprecated)]
515        {
516            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
517                is_commit_epoch = true;
518                tracing::trace!(
519                    "max committed epoch bumped but no table committed epoch is changed"
520                );
521            }
522        }
523
524        // apply to `levels`, which is different compaction groups
525        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
526            let mut is_applied_l0_compact = false;
527            for group_delta in &group_deltas.group_deltas {
528                match group_delta {
529                    GroupDeltaCommon::GroupConstruct(group_construct) => {
530                        let mut new_levels = build_initial_compaction_group_levels(
531                            *compaction_group_id,
532                            group_construct.get_group_config().unwrap(),
533                        );
534                        let parent_group_id = group_construct.parent_group_id;
535                        new_levels.parent_group_id = parent_group_id;
536                        #[expect(deprecated)]
537                        // for backward-compatibility of previous hummock version delta
538                        new_levels
539                            .member_table_ids
540                            .clone_from(&group_construct.table_ids);
541                        self.levels.insert(*compaction_group_id, new_levels);
542                        let member_table_ids = if group_construct.version()
543                            >= CompatibilityVersion::NoMemberTableIds
544                        {
545                            self.state_table_info
546                                .compaction_group_member_table_ids(*compaction_group_id)
547                                .iter()
548                                .copied()
549                                .collect()
550                        } else {
551                            #[expect(deprecated)]
552                            // for backward-compatibility of previous hummock version delta
553                            BTreeSet::from_iter(
554                                group_construct.table_ids.iter().copied().map(Into::into),
555                            )
556                        };
557
558                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
559                            let split_key = if group_construct.split_key.is_some() {
560                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
561                            } else {
562                                None
563                            };
564                            self.init_with_parent_group_v2(
565                                parent_group_id,
566                                *compaction_group_id,
567                                group_construct.new_sst_start_id,
568                                split_key.clone(),
569                            );
570                        } else {
571                            // for backward-compatibility of previous hummock version delta
572                            self.init_with_parent_group(
573                                parent_group_id,
574                                *compaction_group_id,
575                                member_table_ids,
576                                group_construct.new_sst_start_id,
577                            );
578                        }
579                    }
580                    GroupDeltaCommon::GroupMerge(group_merge) => {
581                        tracing::info!(
582                            "group_merge left {:?} right {:?}",
583                            group_merge.left_group_id,
584                            group_merge.right_group_id
585                        );
586                        self.merge_compaction_group(
587                            group_merge.left_group_id,
588                            group_merge.right_group_id,
589                        )
590                    }
591                    GroupDeltaCommon::IntraLevel(level_delta) => {
592                        let levels =
593                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
594                                panic!("compaction group {} does not exist", compaction_group_id)
595                            });
596                        if is_commit_epoch {
597                            assert!(
598                                level_delta.removed_table_ids.is_empty(),
599                                "no sst should be deleted when committing an epoch"
600                            );
601
602                            let IntraLevelDelta {
603                                level_idx,
604                                l0_sub_level_id,
605                                inserted_table_infos,
606                                ..
607                            } = level_delta;
608                            {
609                                assert_eq!(
610                                    *level_idx, 0,
611                                    "we should only add to L0 when we commit an epoch."
612                                );
613                                if !inserted_table_infos.is_empty() {
614                                    insert_new_sub_level(
615                                        &mut levels.l0,
616                                        *l0_sub_level_id,
617                                        PbLevelType::Overlapping,
618                                        inserted_table_infos.clone(),
619                                        None,
620                                    );
621                                }
622                            }
623                        } else {
624                            // The delta is caused by compaction.
625                            levels.apply_compact_ssts(
626                                level_delta,
627                                self.state_table_info
628                                    .compaction_group_member_table_ids(*compaction_group_id),
629                            );
630                            if level_delta.level_idx == 0 {
631                                is_applied_l0_compact = true;
632                            }
633                        }
634                    }
635                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
636                        let levels =
637                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
638                                panic!("compaction group {} does not exist", compaction_group_id)
639                            });
640                        assert!(is_commit_epoch);
641
642                        if !inserted_table_infos.is_empty() {
643                            let next_l0_sub_level_id = levels
644                                .l0
645                                .sub_levels
646                                .last()
647                                .map(|level| level.sub_level_id + 1)
648                                .unwrap_or(1);
649
650                            insert_new_sub_level(
651                                &mut levels.l0,
652                                next_l0_sub_level_id,
653                                PbLevelType::Overlapping,
654                                inserted_table_infos.clone(),
655                                None,
656                            );
657                        }
658                    }
659                    GroupDeltaCommon::GroupDestroy(_) => {
660                        self.levels.remove(compaction_group_id);
661                    }
662
663                    GroupDeltaCommon::PruneTableIdsFromSsts(table_ids) => {
664                        self.levels
665                            .get_mut(compaction_group_id)
666                            .unwrap_or_else(|| {
667                                panic!("compaction group {} does not exist", compaction_group_id)
668                            })
669                            .prune_table_ids_from_ssts(table_ids);
670                    }
671                }
672            }
673
674            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
675            {
676                levels.l0.normalize();
677            }
678        }
679        self.id = version_delta.id;
680        #[expect(deprecated)]
681        {
682            self.max_committed_epoch = version_delta.max_committed_epoch;
683        }
684
685        // apply to table watermark
686
687        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
688        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
689            HashMap::new();
690
691        // apply to table watermark
692        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
693            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
694                if version_delta.removed_table_ids.contains(table_id) {
695                    modified_table_watermarks.insert(*table_id, None);
696                } else {
697                    let mut current_table_watermarks = (**current_table_watermarks).clone();
698                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
699                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
700                }
701            } else {
702                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
703            }
704        }
705        for (table_id, table_watermarks) in &self.table_watermarks {
706            let safe_epoch = if let Some(state_table_info) =
707                self.state_table_info.info().get(table_id)
708                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
709                && state_table_info.committed_epoch > *oldest_epoch
710            {
711                // safe epoch has progressed, need further clear.
712                state_table_info.committed_epoch
713            } else {
714                // safe epoch not progressed or the table has been removed. No need to truncate
715                continue;
716            };
717            let table_watermarks = modified_table_watermarks
718                .entry(*table_id)
719                .or_insert_with(|| Some((**table_watermarks).clone()));
720            if let Some(table_watermarks) = table_watermarks {
721                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
722            }
723        }
724        // apply the staging table watermark to hummock version
725        for (table_id, table_watermarks) in modified_table_watermarks {
726            if let Some(table_watermarks) = table_watermarks {
727                self.table_watermarks
728                    .insert(table_id, Arc::new(table_watermarks));
729            } else {
730                self.table_watermarks.remove(&table_id);
731            }
732        }
733        // apply to vector index
734        apply_vector_index_delta(
735            &mut self.vector_indexes,
736            &version_delta.vector_index_delta,
737            &version_delta.removed_table_ids,
738        );
739
740        changed_table_info
741    }
742
743    pub fn apply_change_log_delta<T: Clone>(
744        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
745        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
746    ) {
747        for (table_id, change_log_delta) in change_log_delta {
748            let new_change_log = &change_log_delta.new_log;
749            match table_change_log.entry(*table_id) {
750                Entry::Occupied(entry) => {
751                    let change_log = entry.into_mut();
752                    change_log.add_change_log(new_change_log.clone());
753                }
754                Entry::Vacant(entry) => {
755                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
756                }
757            };
758        }
759
760        // truncate the remaining table change log
761        for (table_id, change_log_delta) in change_log_delta {
762            if let Some(change_log) = table_change_log.get_mut(table_id) {
763                change_log.truncate(change_log_delta.truncate_epoch);
764            }
765        }
766    }
767
768    /// Returns the log deltas required to truncate the entire change log for a table.
769    pub fn collect_gc_change_log_delta<'a, T: Clone>(
770        current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
771        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
772        removed_table_ids: &HashSet<TableId>,
773        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
774        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
775    ) -> HashSet<TableId> {
776        let mut gc_change_log_delta = HashSet::new();
777        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
778        // the change log for the table, and then we will remove the table change log.
779        // The table change log will also be removed when the table id is removed.
780        for table_id in current_change_log_table_ids {
781            if removed_table_ids.contains(table_id) {
782                gc_change_log_delta.insert(*table_id);
783                continue;
784            }
785            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
786                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
787                && table_info_delta.committed_epoch > prev_table_info.committed_epoch
788            {
789                // the table exists previously, and its committed epoch has progressed.
790            } else {
791                // otherwise, the table change log should be kept anyway
792                continue;
793            }
794            let contains = change_log_delta.contains_key(table_id);
795            if !contains {
796                gc_change_log_delta.insert(*table_id);
797                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
798                    LazyLock::new(|| LogSuppressor::per_second(1));
799                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
800                    warn!(
801                        suppressed_count,
802                        %table_id,
803                        "table change log dropped due to no further change log at newly committed epoch"
804                    );
805                }
806            }
807        }
808        gc_change_log_delta
809    }
810
811    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
812        let mut ret: BTreeMap<_, _> = BTreeMap::new();
813        for (compaction_group_id, group) in &self.levels {
814            let mut levels = vec![];
815            levels.extend(group.l0.sub_levels.iter());
816            levels.extend(group.levels.iter());
817            for level in levels {
818                for table_info in &level.table_infos {
819                    if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
820                        continue;
821                    }
822                    let object_id = table_info.object_id;
823                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
824                    entry
825                        .entry(*compaction_group_id)
826                        .or_default()
827                        .push(table_info.sst_id)
828                }
829            }
830        }
831        ret
832    }
833
834    pub fn merge_compaction_group(
835        &mut self,
836        left_group_id: CompactionGroupId,
837        right_group_id: CompactionGroupId,
838    ) {
839        // Double check
840        let left_group_id_table_ids = self
841            .state_table_info
842            .compaction_group_member_table_ids(left_group_id)
843            .iter();
844        let right_group_id_table_ids = self
845            .state_table_info
846            .compaction_group_member_table_ids(right_group_id)
847            .iter();
848
849        assert!(
850            left_group_id_table_ids
851                .chain(right_group_id_table_ids)
852                .is_sorted()
853        );
854
855        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
856        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
857            panic!(
858                "compaction group should exist right {} all {:?}",
859                right_group_id, total_cg
860            )
861        });
862
863        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
864            panic!(
865                "compaction group should exist left {} all {:?}",
866                left_group_id, total_cg
867            )
868        });
869
870        group_split::merge_levels(left_levels, right_levels);
871    }
872
873    pub fn init_with_parent_group_v2(
874        &mut self,
875        parent_group_id: CompactionGroupId,
876        group_id: CompactionGroupId,
877        new_sst_start_id: HummockSstableId,
878        split_key: Option<Bytes>,
879    ) {
880        let mut new_sst_id = new_sst_start_id;
881        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
882            if new_sst_start_id != 0 {
883                if cfg!(debug_assertions) {
884                    panic!(
885                        "non-zero sst start id {} for NewCompactionGroup",
886                        new_sst_start_id
887                    );
888                } else {
889                    warn!(
890                        %new_sst_start_id,
891                        "non-zero sst start id for NewCompactionGroup"
892                    );
893                }
894            }
895            return;
896        } else if !self.levels.contains_key(&parent_group_id) {
897            unreachable!(
898                "non-existing parent group id {} to init from (V2)",
899                parent_group_id
900            );
901        }
902
903        let [parent_levels, cur_levels] = self
904            .levels
905            .get_disjoint_mut([&parent_group_id, &group_id])
906            .map(|res| res.unwrap());
907        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
908        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
909        parent_levels.compaction_group_version_id += 1;
910        cur_levels.compaction_group_version_id += 1;
911
912        let l0 = &mut parent_levels.l0;
913        {
914            for sub_level in &mut l0.sub_levels {
915                let target_l0 = &mut cur_levels.l0;
916                // Remove SST from sub level may result in empty sub level. It will be purged
917                // whenever another compaction task is finished.
918                let insert_table_infos = if let Some(split_key) = &split_key {
919                    group_split::split_sst_info_for_level_v2(
920                        sub_level,
921                        &mut new_sst_id,
922                        split_key.clone(),
923                    )
924                } else {
925                    vec![]
926                };
927
928                if insert_table_infos.is_empty() {
929                    continue;
930                }
931
932                sub_level.normalize();
933                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
934                    Ok(idx) => {
935                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
936                    }
937                    Err(idx) => {
938                        insert_new_sub_level(
939                            target_l0,
940                            sub_level.sub_level_id,
941                            sub_level.level_type,
942                            insert_table_infos,
943                            Some(idx),
944                        );
945                    }
946                }
947            }
948            l0.normalize();
949        }
950
951        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
952            let insert_table_infos = if let Some(split_key) = &split_key {
953                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
954            } else {
955                vec![]
956            };
957
958            if insert_table_infos.is_empty() {
959                continue;
960            }
961
962            cur_levels.levels[idx].total_file_size += insert_table_infos
963                .iter()
964                .map(|sst| sst.sst_size)
965                .sum::<u64>();
966            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
967                .iter()
968                .map(|sst| sst.uncompressed_file_size)
969                .sum::<u64>();
970            cur_levels.levels[idx]
971                .table_infos
972                .extend(insert_table_infos);
973            cur_levels.levels[idx]
974                .table_infos
975                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
976            assert!(can_concat(&cur_levels.levels[idx].table_infos));
977            level.normalize();
978        }
979
980        assert!(
981            parent_levels
982                .l0
983                .sub_levels
984                .iter()
985                .all(|level| !level.table_infos.is_empty())
986        );
987        assert!(
988            cur_levels
989                .l0
990                .sub_levels
991                .iter()
992                .all(|level| !level.table_infos.is_empty())
993        );
994    }
995}
996
997impl<T> HummockVersionCommon<T>
998where
999    T: SstableIdReader + ObjectIdReader,
1000{
1001    pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
1002        // DO NOT REMOVE THIS LINE
1003        // This is to ensure that when adding new variant to `HummockObjectId`,
1004        // the compiler will warn us if we forget to handle it here.
1005        match HummockObjectId::Sstable(0.into()) {
1006            HummockObjectId::Sstable(_) => {}
1007            HummockObjectId::VectorFile(_) => {}
1008            HummockObjectId::HnswGraphFile(_) => {}
1009        };
1010        self.get_sst_infos()
1011            .map(|s| HummockObjectId::Sstable(s.object_id()))
1012            .chain(
1013                self.vector_indexes
1014                    .values()
1015                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1016            )
1017    }
1018
1019    pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
1020        self.get_sst_infos().map(|s| s.sst_id()).collect()
1021    }
1022
1023    pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
1024        self.get_combined_levels()
1025            .flat_map(|level| level.table_infos.iter())
1026    }
1027}
1028
1029impl Levels {
1030    pub(crate) fn apply_compact_ssts(
1031        &mut self,
1032        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1033        member_table_ids: &BTreeSet<TableId>,
1034    ) {
1035        let IntraLevelDeltaCommon {
1036            level_idx,
1037            l0_sub_level_id,
1038            inserted_table_infos: insert_table_infos,
1039            vnode_partition_count,
1040            removed_table_ids: delete_sst_ids_set,
1041            compaction_group_version_id,
1042        } = level_delta;
1043        let new_vnode_partition_count = *vnode_partition_count;
1044
1045        if is_compaction_task_expired(
1046            self.compaction_group_version_id,
1047            *compaction_group_version_id,
1048        ) {
1049            warn!(
1050                current_compaction_group_version_id = self.compaction_group_version_id,
1051                delta_compaction_group_version_id = compaction_group_version_id,
1052                level_idx,
1053                l0_sub_level_id,
1054                insert_table_infos = ?insert_table_infos
1055                    .iter()
1056                    .map(|sst| (sst.sst_id, sst.object_id))
1057                    .collect_vec(),
1058                ?delete_sst_ids_set,
1059                "This VersionDelta may be committed by an expired compact task. Please check it."
1060            );
1061            return;
1062        }
1063        if !delete_sst_ids_set.is_empty() {
1064            if *level_idx == 0 {
1065                for level in &mut self.l0.sub_levels {
1066                    level.delete_ssts(delete_sst_ids_set);
1067                }
1068            } else {
1069                let idx = *level_idx as usize - 1;
1070                self.levels[idx].delete_ssts(delete_sst_ids_set);
1071            }
1072        }
1073
1074        if !insert_table_infos.is_empty() {
1075            let insert_sst_level_id = *level_idx;
1076            let insert_sub_level_id = *l0_sub_level_id;
1077            if insert_sst_level_id == 0 {
1078                let l0 = &mut self.l0;
1079                let index = l0
1080                    .sub_levels
1081                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1082                assert!(
1083                    index < l0.sub_levels.len()
1084                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1085                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1086                    insert_sub_level_id,
1087                    delete_sst_ids_set,
1088                    l0.sub_levels
1089                        .iter()
1090                        .map(|level| level.sub_level_id)
1091                        .collect_vec()
1092                );
1093                if l0.sub_levels[index].table_infos.is_empty()
1094                    && member_table_ids.len() == 1
1095                    && insert_table_infos.iter().all(|sst| {
1096                        sst.table_ids.len() == 1
1097                            && sst.table_ids[0]
1098                                == *member_table_ids.iter().next().expect("non-empty")
1099                    })
1100                {
1101                    // Only change vnode_partition_count for group which has only one state-table.
1102                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1103                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1104                }
1105                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1106            } else {
1107                let idx = insert_sst_level_id as usize - 1;
1108                if self.levels[idx].table_infos.is_empty()
1109                    && insert_table_infos
1110                        .iter()
1111                        .all(|sst| sst.table_ids.len() == 1)
1112                {
1113                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1114                } else if self.levels[idx].vnode_partition_count != 0
1115                    && new_vnode_partition_count == 0
1116                    && member_table_ids.len() > 1
1117                {
1118                    self.levels[idx].vnode_partition_count = 0;
1119                }
1120                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1121            }
1122        }
1123    }
1124
1125    /// Prune specified table ids from all SST metadata, remove emptied SSTs and sub-levels,
1126    /// then bump `compaction_group_version_id`.
1127    pub(crate) fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1128        for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1129            level.prune_table_ids_from_ssts(table_ids);
1130        }
1131        self.l0.normalize();
1132        self.compaction_group_version_id += 1;
1133    }
1134}
1135
1136impl<T, L> HummockVersionCommon<T, L> {
1137    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1138        self.levels
1139            .values()
1140            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1141    }
1142}
1143
1144pub fn build_initial_compaction_group_levels(
1145    group_id: impl Into<CompactionGroupId>,
1146    compaction_config: &CompactionConfig,
1147) -> Levels {
1148    let mut levels = vec![];
1149    for l in 0..compaction_config.get_max_level() {
1150        levels.push(Level {
1151            level_idx: (l + 1) as u32,
1152            level_type: PbLevelType::Nonoverlapping,
1153            table_infos: vec![],
1154            total_file_size: 0,
1155            sub_level_id: 0,
1156            uncompressed_file_size: 0,
1157            vnode_partition_count: 0,
1158        });
1159    }
1160    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1161    Levels {
1162        levels,
1163        l0: OverlappingLevel {
1164            sub_levels: vec![],
1165            total_file_size: 0,
1166            uncompressed_file_size: 0,
1167        },
1168        group_id: group_id.into(),
1169        parent_group_id: 0.into(),
1170        member_table_ids: vec![],
1171        compaction_group_version_id: 0,
1172    }
1173}
1174
1175fn split_sst_info_for_level(
1176    member_table_ids: &BTreeSet<TableId>,
1177    level: &mut Level,
1178    new_sst_id: &mut HummockSstableId,
1179) -> Vec<SstableInfo> {
1180    // Remove SST from sub level may result in empty sub level. It will be purged
1181    // whenever another compaction task is finished.
1182    let mut insert_table_infos = vec![];
1183    for sst_info in &mut level.table_infos {
1184        let removed_table_ids = sst_info
1185            .table_ids
1186            .iter()
1187            .filter(|table_id| member_table_ids.contains(*table_id))
1188            .cloned()
1189            .collect_vec();
1190        let sst_size = sst_info.sst_size;
1191        if sst_size / 2 == 0 {
1192            tracing::warn!(
1193                id = %sst_info.sst_id,
1194                object_id = %sst_info.object_id,
1195                sst_size = sst_info.sst_size,
1196                file_size = sst_info.file_size,
1197                "Sstable sst_size is under expected",
1198            );
1199        };
1200        if !removed_table_ids.is_empty() {
1201            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1202                sst_info,
1203                new_sst_id,
1204                sst_size / 2,
1205                sst_size / 2,
1206                member_table_ids.iter().cloned().collect_vec(),
1207            );
1208            *sst_info = modified_sst;
1209            insert_table_infos.push(branch_sst);
1210        }
1211    }
1212    insert_table_infos
1213}
1214
1215/// Gets all compaction group ids.
1216pub fn get_compaction_group_ids(
1217    version: &HummockVersion,
1218) -> impl Iterator<Item = CompactionGroupId> + '_ {
1219    version.levels.keys().cloned()
1220}
1221
1222pub fn get_table_compaction_group_id_mapping(
1223    version: &HummockVersion,
1224) -> HashMap<StateTableId, CompactionGroupId> {
1225    version
1226        .state_table_info
1227        .info()
1228        .iter()
1229        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1230        .collect()
1231}
1232
1233/// Gets all SSTs in `group_id`
1234pub fn get_compaction_group_ssts(
1235    version: &HummockVersion,
1236    group_id: CompactionGroupId,
1237) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1238    let group_levels = version.get_compaction_group_levels(group_id);
1239    group_levels
1240        .l0
1241        .sub_levels
1242        .iter()
1243        .rev()
1244        .chain(group_levels.levels.iter())
1245        .flat_map(|level| {
1246            level
1247                .table_infos
1248                .iter()
1249                .map(|table_info| (table_info.object_id, table_info.sst_id))
1250        })
1251}
1252
1253pub fn new_sub_level(
1254    sub_level_id: u64,
1255    level_type: PbLevelType,
1256    table_infos: Vec<SstableInfo>,
1257) -> Level {
1258    if level_type == PbLevelType::Nonoverlapping {
1259        debug_assert!(
1260            can_concat(&table_infos),
1261            "sst of non-overlapping level is not concat-able: {:?}",
1262            table_infos
1263        );
1264    }
1265    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1266    let uncompressed_file_size = table_infos
1267        .iter()
1268        .map(|table| table.uncompressed_file_size)
1269        .sum();
1270    Level {
1271        level_idx: 0,
1272        level_type,
1273        table_infos,
1274        total_file_size,
1275        sub_level_id,
1276        uncompressed_file_size,
1277        vnode_partition_count: 0,
1278    }
1279}
1280
1281pub fn add_ssts_to_sub_level(
1282    l0: &mut OverlappingLevel,
1283    sub_level_idx: usize,
1284    insert_table_infos: Vec<SstableInfo>,
1285) {
1286    insert_table_infos.iter().for_each(|sst| {
1287        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1288        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1289        l0.total_file_size += sst.sst_size;
1290        l0.uncompressed_file_size += sst.uncompressed_file_size;
1291    });
1292    l0.sub_levels[sub_level_idx]
1293        .table_infos
1294        .extend(insert_table_infos);
1295    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1296        l0.sub_levels[sub_level_idx]
1297            .table_infos
1298            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1299        assert!(
1300            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1301            "sstable ids: {:?}",
1302            l0.sub_levels[sub_level_idx]
1303                .table_infos
1304                .iter()
1305                .map(|sst| sst.sst_id)
1306                .collect_vec()
1307        );
1308    }
1309}
1310
1311/// `None` value of `sub_level_insert_hint` means append.
1312pub fn insert_new_sub_level(
1313    l0: &mut OverlappingLevel,
1314    insert_sub_level_id: u64,
1315    level_type: PbLevelType,
1316    insert_table_infos: Vec<SstableInfo>,
1317    sub_level_insert_hint: Option<usize>,
1318) {
1319    if insert_sub_level_id == u64::MAX {
1320        return;
1321    }
1322    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1323        insert_pos
1324    } else {
1325        if let Some(newest_level) = l0.sub_levels.last() {
1326            assert!(
1327                newest_level.sub_level_id < insert_sub_level_id,
1328                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1329                newest_level.sub_level_id,
1330                insert_sub_level_id,
1331                l0,
1332            );
1333        }
1334        l0.sub_levels.len()
1335    };
1336    #[cfg(debug_assertions)]
1337    {
1338        if insert_pos > 0
1339            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1340        {
1341            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1342        }
1343        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1344            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1345        }
1346    }
1347    // All files will be committed in one new Overlapping sub-level and become
1348    // Nonoverlapping  after at least one compaction.
1349    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1350    l0.total_file_size += level.total_file_size;
1351    l0.uncompressed_file_size += level.uncompressed_file_size;
1352    l0.sub_levels.insert(insert_pos, level);
1353}
1354
1355impl Level {
1356    fn recompute_size(&mut self) {
1357        self.total_file_size = self
1358            .table_infos
1359            .iter()
1360            .map(|table| table.sst_size)
1361            .sum::<u64>();
1362        self.uncompressed_file_size = self
1363            .table_infos
1364            .iter()
1365            .map(|table| table.uncompressed_file_size)
1366            .sum::<u64>();
1367    }
1368
1369    /// Remove SSTs with empty `table_ids`, then recompute sizes.
1370    fn normalize(&mut self) {
1371        self.table_infos
1372            .retain(|sst_info| !sst_info.table_ids.is_empty());
1373        self.recompute_size();
1374    }
1375
1376    /// Return `true` if any SST was actually removed.
1377    fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1378        let original_len = self.table_infos.len();
1379        self.table_infos
1380            .retain(|table| !ids.contains(&table.sst_id));
1381        self.recompute_size();
1382        original_len != self.table_infos.len()
1383    }
1384
1385    /// Prune specified `table_ids` from each SST's metadata,
1386    /// remove SSTs that become empty, then recompute sizes.
1387    fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1388        for sstable_info in &mut self.table_infos {
1389            if !sstable_info
1390                .table_ids
1391                .iter()
1392                .any(|table_id| table_ids.contains(table_id))
1393            {
1394                continue;
1395            }
1396
1397            let mut inner = sstable_info.get_inner();
1398            inner.table_ids.retain(|id| !table_ids.contains(id));
1399            sstable_info.set_inner(inner);
1400        }
1401        self.normalize();
1402    }
1403}
1404
1405impl OverlappingLevel {
1406    /// Remove empty sub-levels, then recompute aggregated sizes.
1407    fn normalize(&mut self) {
1408        self.sub_levels
1409            .retain(|level| !level.table_infos.is_empty());
1410        self.total_file_size = self
1411            .sub_levels
1412            .iter()
1413            .map(|level| level.total_file_size)
1414            .sum::<u64>();
1415        self.uncompressed_file_size = self
1416            .sub_levels
1417            .iter()
1418            .map(|level| level.uncompressed_file_size)
1419            .sum::<u64>();
1420    }
1421}
1422
1423fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1424    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1425        format!(
1426            "sstable ids: {:?}",
1427            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1428        )
1429    }
1430    operand.total_file_size += insert_table_infos
1431        .iter()
1432        .map(|sst| sst.sst_size)
1433        .sum::<u64>();
1434    operand.uncompressed_file_size += insert_table_infos
1435        .iter()
1436        .map(|sst| sst.uncompressed_file_size)
1437        .sum::<u64>();
1438    if operand.level_type == PbLevelType::Overlapping {
1439        operand.level_type = PbLevelType::Nonoverlapping;
1440        operand
1441            .table_infos
1442            .extend(insert_table_infos.iter().cloned());
1443        operand
1444            .table_infos
1445            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1446        assert!(
1447            can_concat(&operand.table_infos),
1448            "{}",
1449            display_sstable_infos(&operand.table_infos)
1450        );
1451    } else if !insert_table_infos.is_empty() {
1452        let sorted_insert: Vec<_> = insert_table_infos
1453            .iter()
1454            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1455            .cloned()
1456            .collect();
1457        let first = &sorted_insert[0];
1458        let last = &sorted_insert[sorted_insert.len() - 1];
1459        let pos = operand
1460            .table_infos
1461            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1462        if pos >= operand.table_infos.len()
1463            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1464        {
1465            operand.table_infos.splice(pos..pos, sorted_insert);
1466            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1467            let validate_range = operand
1468                .table_infos
1469                .iter()
1470                .skip(pos.saturating_sub(1))
1471                .take(insert_table_infos.len() + 2)
1472                .collect_vec();
1473            assert!(
1474                can_concat(&validate_range),
1475                "{}",
1476                display_sstable_infos(&validate_range),
1477            );
1478        } else {
1479            // If this branch is reached, it indicates some unexpected behavior in compaction.
1480            // Here we issue a warning and fall back to insert one by one.
1481            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1482            for i in insert_table_infos {
1483                let pos = operand
1484                    .table_infos
1485                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1486                operand.table_infos.insert(pos, i.clone());
1487            }
1488            assert!(
1489                can_concat(&operand.table_infos),
1490                "{}",
1491                display_sstable_infos(&operand.table_infos)
1492            );
1493        }
1494    }
1495}
1496
1497pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1498    // DO NOT REMOVE THIS LINE
1499    // This is to ensure that when adding new variant to `HummockObjectId`,
1500    // the compiler will warn us if we forget to handle it here.
1501    match HummockObjectId::Sstable(0.into()) {
1502        HummockObjectId::Sstable(_) => {}
1503        HummockObjectId::VectorFile(_) => {}
1504        HummockObjectId::HnswGraphFile(_) => {}
1505    };
1506    version
1507        .levels
1508        .values()
1509        .flat_map(|cg| {
1510            cg.level0()
1511                .sub_levels
1512                .iter()
1513                .chain(cg.levels.iter())
1514                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1515        })
1516        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1517        .chain(
1518            version
1519                .vector_indexes
1520                .values()
1521                .flat_map(|index| index.get_objects()),
1522        )
1523        .collect()
1524}
1525
1526/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1527/// Currently this method is only used by risectl validate-version.
1528pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1529    let mut res = Vec::new();
1530    // Ensure each table maps to only one compaction group
1531    for (group_id, levels) in &version.levels {
1532        // Ensure compaction group id matches
1533        if levels.group_id != *group_id {
1534            res.push(format!(
1535                "GROUP {}: inconsistent group id {} in Levels",
1536                group_id, levels.group_id
1537            ));
1538        }
1539
1540        let validate_level = |group: CompactionGroupId,
1541                              expected_level_idx: u32,
1542                              level: &Level,
1543                              res: &mut Vec<String>| {
1544            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1545            if level.level_idx == 0 {
1546                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1547                // Ensure sub-level is not empty
1548                if level.table_infos.is_empty() {
1549                    res.push(format!("{}: empty level", level_identifier));
1550                }
1551            } else if level.level_type != PbLevelType::Nonoverlapping {
1552                // Ensure non-L0 level is non-overlapping level
1553                res.push(format!(
1554                    "{}: level type {:?} is not non-overlapping",
1555                    level_identifier, level.level_type
1556                ));
1557            }
1558
1559            // Ensure level idx matches
1560            if level.level_idx != expected_level_idx {
1561                res.push(format!(
1562                    "{}: mismatched level idx {}",
1563                    level_identifier, expected_level_idx
1564                ));
1565            }
1566
1567            let mut prev_table_info: Option<&SstableInfo> = None;
1568            for table_info in &level.table_infos {
1569                // Ensure table_ids are sorted and unique
1570                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1571                    res.push(format!(
1572                        "{} SST {}: table_ids not sorted",
1573                        level_identifier, table_info.object_id
1574                    ));
1575                }
1576
1577                // Ensure SSTs in non-overlapping level have non-overlapping key range
1578                if level.level_type == PbLevelType::Nonoverlapping {
1579                    if let Some(prev) = prev_table_info.take()
1580                        && prev
1581                            .key_range
1582                            .compare_right_with(&table_info.key_range.left)
1583                            != Ordering::Less
1584                    {
1585                        res.push(format!(
1586                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1587                            level_identifier, table_info.object_id, prev, table_info
1588                        ));
1589                    }
1590                    let _ = prev_table_info.insert(table_info);
1591                }
1592            }
1593        };
1594
1595        let l0 = &levels.l0;
1596        let mut prev_sub_level_id = u64::MAX;
1597        for sub_level in &l0.sub_levels {
1598            // Ensure sub_level_id is sorted and unique
1599            if sub_level.sub_level_id >= prev_sub_level_id {
1600                res.push(format!(
1601                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1602                    group_id, sub_level.level_idx, prev_sub_level_id
1603                ));
1604            }
1605            prev_sub_level_id = sub_level.sub_level_id;
1606
1607            validate_level(*group_id, 0, sub_level, &mut res);
1608        }
1609
1610        for idx in 1..=levels.levels.len() {
1611            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1612        }
1613    }
1614    res
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619    use std::collections::{HashMap, HashSet};
1620
1621    use bytes::Bytes;
1622    use risingwave_common::catalog::TableId;
1623    use risingwave_common::hash::VirtualNode;
1624    use risingwave_common::util::epoch::test_epoch;
1625    use risingwave_pb::hummock::{
1626        CompactionConfig, GroupConstruct, GroupDestroy, LevelType, StateTableInfo,
1627    };
1628
1629    use super::group_split;
1630    use crate::HummockVersionId;
1631    use crate::compaction_group::group_split::*;
1632    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1633    use crate::key::{FullKey, gen_key_from_str};
1634    use crate::key_range::KeyRange;
1635    use crate::level::{Level, Levels, OverlappingLevel};
1636    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1637    use crate::version::{
1638        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
1639        IntraLevelDelta,
1640    };
1641
1642    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1643        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1644    }
1645
1646    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1647        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1648        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1649        let full_key_l = FullKey::for_test(
1650            TableId::new(*table_ids.first().unwrap()),
1651            table_key_l,
1652            epoch,
1653        )
1654        .encode();
1655        let full_key_r =
1656            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1657                .encode();
1658
1659        SstableInfoInner {
1660            sst_id: sst_id.into(),
1661            key_range: KeyRange {
1662                left: full_key_l.into(),
1663                right: full_key_r.into(),
1664                right_exclusive: false,
1665            },
1666            table_ids: table_ids.into_iter().map(Into::into).collect(),
1667            object_id: sst_id.into(),
1668            min_epoch: 20,
1669            max_epoch: 20,
1670            file_size: 100,
1671            sst_size: 100,
1672            ..Default::default()
1673        }
1674    }
1675
1676    #[test]
1677    fn test_get_sst_object_ids() {
1678        let mut version = HummockVersion {
1679            id: HummockVersionId::new(0),
1680            levels: HashMap::from_iter([(
1681                0.into(),
1682                Levels {
1683                    levels: vec![],
1684                    l0: OverlappingLevel {
1685                        sub_levels: vec![],
1686                        total_file_size: 0,
1687                        uncompressed_file_size: 0,
1688                    },
1689                    ..Default::default()
1690                },
1691            )]),
1692            ..Default::default()
1693        };
1694        assert_eq!(version.get_object_ids().count(), 0);
1695
1696        // Add to sub level
1697        version
1698            .levels
1699            .get_mut(&0)
1700            .unwrap()
1701            .l0
1702            .sub_levels
1703            .push(Level {
1704                table_infos: vec![
1705                    SstableInfoInner {
1706                        object_id: 11.into(),
1707                        sst_id: 11.into(),
1708                        ..Default::default()
1709                    }
1710                    .into(),
1711                ],
1712                ..Default::default()
1713            });
1714        assert_eq!(version.get_object_ids().count(), 1);
1715
1716        // Add to non sub level
1717        version.levels.get_mut(&0).unwrap().levels.push(Level {
1718            table_infos: vec![
1719                SstableInfoInner {
1720                    object_id: 22.into(),
1721                    sst_id: 22.into(),
1722                    ..Default::default()
1723                }
1724                .into(),
1725            ],
1726            ..Default::default()
1727        });
1728        assert_eq!(version.get_object_ids().count(), 2);
1729    }
1730
1731    #[test]
1732    fn test_apply_version_delta() {
1733        let mut version = HummockVersion {
1734            id: HummockVersionId::new(0),
1735            levels: HashMap::from_iter([
1736                (
1737                    0.into(),
1738                    build_initial_compaction_group_levels(
1739                        0,
1740                        &CompactionConfig {
1741                            max_level: 6,
1742                            ..Default::default()
1743                        },
1744                    ),
1745                ),
1746                (
1747                    1.into(),
1748                    build_initial_compaction_group_levels(
1749                        1,
1750                        &CompactionConfig {
1751                            max_level: 6,
1752                            ..Default::default()
1753                        },
1754                    ),
1755                ),
1756            ]),
1757            ..Default::default()
1758        };
1759        let version_delta = HummockVersionDelta {
1760            id: HummockVersionId::new(1),
1761            group_deltas: HashMap::from_iter([
1762                (
1763                    2.into(),
1764                    GroupDeltas {
1765                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1766                            group_config: Some(CompactionConfig {
1767                                max_level: 6,
1768                                ..Default::default()
1769                            }),
1770                            ..Default::default()
1771                        }))],
1772                    },
1773                ),
1774                (
1775                    0.into(),
1776                    GroupDeltas {
1777                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1778                    },
1779                ),
1780                (
1781                    1.into(),
1782                    GroupDeltas {
1783                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1784                            1,
1785                            0,
1786                            HashSet::new(),
1787                            vec![
1788                                SstableInfoInner {
1789                                    object_id: 1.into(),
1790                                    sst_id: 1.into(),
1791                                    ..Default::default()
1792                                }
1793                                .into(),
1794                            ],
1795                            0,
1796                            version
1797                                .levels
1798                                .get(&1)
1799                                .as_ref()
1800                                .unwrap()
1801                                .compaction_group_version_id,
1802                        ))],
1803                    },
1804                ),
1805            ]),
1806            ..Default::default()
1807        };
1808        let version_delta = version_delta;
1809
1810        version.apply_version_delta(&version_delta);
1811        let mut cg1 = build_initial_compaction_group_levels(
1812            1,
1813            &CompactionConfig {
1814                max_level: 6,
1815                ..Default::default()
1816            },
1817        );
1818        cg1.levels[0] = Level {
1819            level_idx: 1,
1820            level_type: LevelType::Nonoverlapping,
1821            table_infos: vec![
1822                SstableInfoInner {
1823                    object_id: 1.into(),
1824                    sst_id: 1.into(),
1825                    ..Default::default()
1826                }
1827                .into(),
1828            ],
1829            ..Default::default()
1830        };
1831        assert_eq!(
1832            version,
1833            HummockVersion {
1834                id: HummockVersionId::new(1),
1835                levels: HashMap::from_iter([
1836                    (
1837                        2.into(),
1838                        build_initial_compaction_group_levels(
1839                            2,
1840                            &CompactionConfig {
1841                                max_level: 6,
1842                                ..Default::default()
1843                            },
1844                        ),
1845                    ),
1846                    (1.into(), cg1),
1847                ]),
1848                ..Default::default()
1849            }
1850        );
1851    }
1852
1853    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1854        gen_sst_info_impl(object_id, table_ids, left, right).into()
1855    }
1856
1857    fn gen_sst_info_impl(
1858        object_id: u64,
1859        table_ids: Vec<u32>,
1860        left: Bytes,
1861        right: Bytes,
1862    ) -> SstableInfoInner {
1863        SstableInfoInner {
1864            object_id: object_id.into(),
1865            sst_id: object_id.into(),
1866            key_range: KeyRange {
1867                left,
1868                right,
1869                right_exclusive: false,
1870            },
1871            table_ids: table_ids.into_iter().map(Into::into).collect(),
1872            file_size: 100,
1873            sst_size: 100,
1874            uncompressed_file_size: 100,
1875            ..Default::default()
1876        }
1877    }
1878
1879    #[test]
1880    fn test_merge_levels() {
1881        let mut left_levels = build_initial_compaction_group_levels(
1882            1,
1883            &CompactionConfig {
1884                max_level: 6,
1885                ..Default::default()
1886            },
1887        );
1888
1889        let mut right_levels = build_initial_compaction_group_levels(
1890            2,
1891            &CompactionConfig {
1892                max_level: 6,
1893                ..Default::default()
1894            },
1895        );
1896
1897        left_levels.levels[0] = Level {
1898            level_idx: 1,
1899            level_type: LevelType::Nonoverlapping,
1900            table_infos: vec![
1901                gen_sst_info(
1902                    1,
1903                    vec![3],
1904                    FullKey::for_test(
1905                        TableId::new(3),
1906                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1907                        0,
1908                    )
1909                    .encode()
1910                    .into(),
1911                    FullKey::for_test(
1912                        TableId::new(3),
1913                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1914                        0,
1915                    )
1916                    .encode()
1917                    .into(),
1918                ),
1919                gen_sst_info(
1920                    10,
1921                    vec![3, 4],
1922                    FullKey::for_test(
1923                        TableId::new(3),
1924                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1925                        0,
1926                    )
1927                    .encode()
1928                    .into(),
1929                    FullKey::for_test(
1930                        TableId::new(4),
1931                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1932                        0,
1933                    )
1934                    .encode()
1935                    .into(),
1936                ),
1937                gen_sst_info(
1938                    11,
1939                    vec![4],
1940                    FullKey::for_test(
1941                        TableId::new(4),
1942                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1943                        0,
1944                    )
1945                    .encode()
1946                    .into(),
1947                    FullKey::for_test(
1948                        TableId::new(4),
1949                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1950                        0,
1951                    )
1952                    .encode()
1953                    .into(),
1954                ),
1955            ],
1956            total_file_size: 300,
1957            ..Default::default()
1958        };
1959
1960        left_levels.l0.sub_levels.push(Level {
1961            level_idx: 0,
1962            table_infos: vec![gen_sst_info(
1963                3,
1964                vec![3],
1965                FullKey::for_test(
1966                    TableId::new(3),
1967                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1968                    0,
1969                )
1970                .encode()
1971                .into(),
1972                FullKey::for_test(
1973                    TableId::new(3),
1974                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1975                    0,
1976                )
1977                .encode()
1978                .into(),
1979            )],
1980            sub_level_id: 101,
1981            level_type: LevelType::Overlapping,
1982            total_file_size: 100,
1983            ..Default::default()
1984        });
1985
1986        left_levels.l0.sub_levels.push(Level {
1987            level_idx: 0,
1988            table_infos: vec![gen_sst_info(
1989                3,
1990                vec![3],
1991                FullKey::for_test(
1992                    TableId::new(3),
1993                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1994                    0,
1995                )
1996                .encode()
1997                .into(),
1998                FullKey::for_test(
1999                    TableId::new(3),
2000                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2001                    0,
2002                )
2003                .encode()
2004                .into(),
2005            )],
2006            sub_level_id: 103,
2007            level_type: LevelType::Overlapping,
2008            total_file_size: 100,
2009            ..Default::default()
2010        });
2011
2012        left_levels.l0.sub_levels.push(Level {
2013            level_idx: 0,
2014            table_infos: vec![gen_sst_info(
2015                3,
2016                vec![3],
2017                FullKey::for_test(
2018                    TableId::new(3),
2019                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2020                    0,
2021                )
2022                .encode()
2023                .into(),
2024                FullKey::for_test(
2025                    TableId::new(3),
2026                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2027                    0,
2028                )
2029                .encode()
2030                .into(),
2031            )],
2032            sub_level_id: 105,
2033            level_type: LevelType::Nonoverlapping,
2034            total_file_size: 100,
2035            ..Default::default()
2036        });
2037
2038        right_levels.levels[0] = Level {
2039            level_idx: 1,
2040            level_type: LevelType::Nonoverlapping,
2041            table_infos: vec![
2042                gen_sst_info(
2043                    1,
2044                    vec![5],
2045                    FullKey::for_test(
2046                        TableId::new(5),
2047                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2048                        0,
2049                    )
2050                    .encode()
2051                    .into(),
2052                    FullKey::for_test(
2053                        TableId::new(5),
2054                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2055                        0,
2056                    )
2057                    .encode()
2058                    .into(),
2059                ),
2060                gen_sst_info(
2061                    10,
2062                    vec![5, 6],
2063                    FullKey::for_test(
2064                        TableId::new(5),
2065                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2066                        0,
2067                    )
2068                    .encode()
2069                    .into(),
2070                    FullKey::for_test(
2071                        TableId::new(6),
2072                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2073                        0,
2074                    )
2075                    .encode()
2076                    .into(),
2077                ),
2078                gen_sst_info(
2079                    11,
2080                    vec![6],
2081                    FullKey::for_test(
2082                        TableId::new(6),
2083                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2084                        0,
2085                    )
2086                    .encode()
2087                    .into(),
2088                    FullKey::for_test(
2089                        TableId::new(6),
2090                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2091                        0,
2092                    )
2093                    .encode()
2094                    .into(),
2095                ),
2096            ],
2097            total_file_size: 300,
2098            ..Default::default()
2099        };
2100
2101        right_levels.l0.sub_levels.push(Level {
2102            level_idx: 0,
2103            table_infos: vec![gen_sst_info(
2104                3,
2105                vec![5],
2106                FullKey::for_test(
2107                    TableId::new(5),
2108                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2109                    0,
2110                )
2111                .encode()
2112                .into(),
2113                FullKey::for_test(
2114                    TableId::new(5),
2115                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2116                    0,
2117                )
2118                .encode()
2119                .into(),
2120            )],
2121            sub_level_id: 101,
2122            level_type: LevelType::Overlapping,
2123            total_file_size: 100,
2124            ..Default::default()
2125        });
2126
2127        right_levels.l0.sub_levels.push(Level {
2128            level_idx: 0,
2129            table_infos: vec![gen_sst_info(
2130                5,
2131                vec![5],
2132                FullKey::for_test(
2133                    TableId::new(5),
2134                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2135                    0,
2136                )
2137                .encode()
2138                .into(),
2139                FullKey::for_test(
2140                    TableId::new(5),
2141                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2142                    0,
2143                )
2144                .encode()
2145                .into(),
2146            )],
2147            sub_level_id: 102,
2148            level_type: LevelType::Overlapping,
2149            total_file_size: 100,
2150            ..Default::default()
2151        });
2152
2153        right_levels.l0.sub_levels.push(Level {
2154            level_idx: 0,
2155            table_infos: vec![gen_sst_info(
2156                3,
2157                vec![5],
2158                FullKey::for_test(
2159                    TableId::new(5),
2160                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2161                    0,
2162                )
2163                .encode()
2164                .into(),
2165                FullKey::for_test(
2166                    TableId::new(5),
2167                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2168                    0,
2169                )
2170                .encode()
2171                .into(),
2172            )],
2173            sub_level_id: 103,
2174            level_type: LevelType::Nonoverlapping,
2175            total_file_size: 100,
2176            ..Default::default()
2177        });
2178
2179        {
2180            // test empty
2181            let mut left_levels = Levels::default();
2182            let right_levels = Levels::default();
2183
2184            group_split::merge_levels(&mut left_levels, right_levels);
2185        }
2186
2187        {
2188            // test empty left
2189            let mut left_levels = build_initial_compaction_group_levels(
2190                1,
2191                &CompactionConfig {
2192                    max_level: 6,
2193                    ..Default::default()
2194                },
2195            );
2196            let right_levels = right_levels.clone();
2197
2198            group_split::merge_levels(&mut left_levels, right_levels);
2199
2200            assert!(left_levels.l0.sub_levels.len() == 3);
2201            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2202            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2203            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2204            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2205            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2206            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2207
2208            assert!(left_levels.levels[0].level_idx == 1);
2209            assert_eq!(300, left_levels.levels[0].total_file_size);
2210        }
2211
2212        {
2213            // test empty right
2214            let mut left_levels = left_levels.clone();
2215            let right_levels = build_initial_compaction_group_levels(
2216                2,
2217                &CompactionConfig {
2218                    max_level: 6,
2219                    ..Default::default()
2220                },
2221            );
2222
2223            group_split::merge_levels(&mut left_levels, right_levels);
2224
2225            assert!(left_levels.l0.sub_levels.len() == 3);
2226            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2227            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2228            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2229            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2230            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2231            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2232
2233            assert!(left_levels.levels[0].level_idx == 1);
2234            assert_eq!(300, left_levels.levels[0].total_file_size);
2235        }
2236
2237        {
2238            let mut left_levels = left_levels.clone();
2239            let right_levels = right_levels.clone();
2240
2241            group_split::merge_levels(&mut left_levels, right_levels);
2242
2243            assert!(left_levels.l0.sub_levels.len() == 6);
2244            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2245            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2246            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2247            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2248            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2249            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2250            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2251            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2252            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2253            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2254            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2255            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2256
2257            assert!(left_levels.levels[0].level_idx == 1);
2258            assert_eq!(600, left_levels.levels[0].total_file_size);
2259        }
2260    }
2261
2262    #[test]
2263    fn test_get_split_pos() {
2264        let epoch = test_epoch(1);
2265        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2266        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2267        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2268
2269        let ssts = vec![s1, s2, s3];
2270        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2271
2272        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2273        assert_eq!(1, pos);
2274
2275        let pos = group_split::get_split_pos(&vec![], split_key);
2276        assert_eq!(0, pos);
2277    }
2278
2279    #[test]
2280    fn test_split_sst() {
2281        let epoch = test_epoch(1);
2282        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2283
2284        {
2285            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2286            let origin_sst = sst.clone();
2287            let sst_size = origin_sst.sst_size;
2288            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2289            assert_eq!(SstSplitType::Both, split_type);
2290
2291            let mut new_sst_id = 10.into();
2292            let (origin_sst, branched_sst) = group_split::split_sst(
2293                origin_sst,
2294                &mut new_sst_id,
2295                split_key,
2296                sst_size / 2,
2297                sst_size / 2,
2298            );
2299
2300            let origin_sst = origin_sst.unwrap();
2301            let branched_sst = branched_sst.unwrap();
2302
2303            assert!(origin_sst.key_range.right_exclusive);
2304            assert!(
2305                origin_sst
2306                    .key_range
2307                    .right
2308                    .cmp(&branched_sst.key_range.left)
2309                    .is_le()
2310            );
2311            assert!(origin_sst.table_ids.is_sorted());
2312            assert!(branched_sst.table_ids.is_sorted());
2313            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2314            assert!(branched_sst.sst_size < origin_sst.file_size);
2315            assert_eq!(10, branched_sst.sst_id);
2316            assert_eq!(11, origin_sst.sst_id);
2317            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2318        }
2319
2320        {
2321            // test un-exist table_id
2322            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2323            let origin_sst = sst.clone();
2324            let sst_size = origin_sst.sst_size;
2325            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2326            assert_eq!(SstSplitType::Both, split_type);
2327
2328            let mut new_sst_id = 10.into();
2329            let (origin_sst, branched_sst) = group_split::split_sst(
2330                origin_sst,
2331                &mut new_sst_id,
2332                split_key,
2333                sst_size / 2,
2334                sst_size / 2,
2335            );
2336
2337            let origin_sst = origin_sst.unwrap();
2338            let branched_sst = branched_sst.unwrap();
2339
2340            assert!(origin_sst.key_range.right_exclusive);
2341            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2342            assert!(origin_sst.table_ids.is_sorted());
2343            assert!(branched_sst.table_ids.is_sorted());
2344            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2345            assert!(branched_sst.sst_size < origin_sst.file_size);
2346            assert_eq!(10, branched_sst.sst_id);
2347            assert_eq!(11, origin_sst.sst_id);
2348            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2349        }
2350
2351        {
2352            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2353            let split_type = group_split::need_to_split(&sst, split_key);
2354            assert_eq!(SstSplitType::Left, split_type);
2355        }
2356
2357        {
2358            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2359            let origin_sst = sst.clone();
2360            let split_type = group_split::need_to_split(&origin_sst, split_key);
2361            assert_eq!(SstSplitType::Both, split_type);
2362
2363            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2364            let origin_sst = sst;
2365            let split_type = group_split::need_to_split(&origin_sst, split_key);
2366            assert_eq!(SstSplitType::Right, split_type);
2367        }
2368
2369        {
2370            // test key_range left = right
2371            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2372            sst.key_range.right = sst.key_range.left.clone();
2373            let sst: SstableInfo = sst.into();
2374            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2375            let origin_sst = sst;
2376            let sst_size = origin_sst.sst_size;
2377
2378            let mut new_sst_id = 10.into();
2379            let (origin_sst, branched_sst) = group_split::split_sst(
2380                origin_sst,
2381                &mut new_sst_id,
2382                split_key,
2383                sst_size / 2,
2384                sst_size / 2,
2385            );
2386
2387            assert!(origin_sst.is_none());
2388            assert!(branched_sst.is_some());
2389        }
2390    }
2391
2392    #[test]
2393    fn test_split_sst_info_for_level() {
2394        let mut version = HummockVersion {
2395            id: HummockVersionId::new(0),
2396            levels: HashMap::from_iter([(
2397                1.into(),
2398                build_initial_compaction_group_levels(
2399                    1,
2400                    &CompactionConfig {
2401                        max_level: 6,
2402                        ..Default::default()
2403                    },
2404                ),
2405            )]),
2406            ..Default::default()
2407        };
2408
2409        let cg1 = version.levels.get_mut(&1).unwrap();
2410
2411        cg1.levels[0] = Level {
2412            level_idx: 1,
2413            level_type: LevelType::Nonoverlapping,
2414            table_infos: vec![
2415                gen_sst_info(
2416                    1,
2417                    vec![3],
2418                    FullKey::for_test(
2419                        TableId::new(3),
2420                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2421                        0,
2422                    )
2423                    .encode()
2424                    .into(),
2425                    FullKey::for_test(
2426                        TableId::new(3),
2427                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2428                        0,
2429                    )
2430                    .encode()
2431                    .into(),
2432                ),
2433                gen_sst_info(
2434                    10,
2435                    vec![3, 4],
2436                    FullKey::for_test(
2437                        TableId::new(3),
2438                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2439                        0,
2440                    )
2441                    .encode()
2442                    .into(),
2443                    FullKey::for_test(
2444                        TableId::new(4),
2445                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2446                        0,
2447                    )
2448                    .encode()
2449                    .into(),
2450                ),
2451                gen_sst_info(
2452                    11,
2453                    vec![4],
2454                    FullKey::for_test(
2455                        TableId::new(4),
2456                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2457                        0,
2458                    )
2459                    .encode()
2460                    .into(),
2461                    FullKey::for_test(
2462                        TableId::new(4),
2463                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2464                        0,
2465                    )
2466                    .encode()
2467                    .into(),
2468                ),
2469            ],
2470            total_file_size: 300,
2471            ..Default::default()
2472        };
2473
2474        cg1.l0.sub_levels.push(Level {
2475            level_idx: 0,
2476            table_infos: vec![
2477                gen_sst_info(
2478                    2,
2479                    vec![2],
2480                    FullKey::for_test(
2481                        TableId::new(0),
2482                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2483                        0,
2484                    )
2485                    .encode()
2486                    .into(),
2487                    FullKey::for_test(
2488                        TableId::new(2),
2489                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2490                        0,
2491                    )
2492                    .encode()
2493                    .into(),
2494                ),
2495                gen_sst_info(
2496                    22,
2497                    vec![2],
2498                    FullKey::for_test(
2499                        TableId::new(0),
2500                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2501                        0,
2502                    )
2503                    .encode()
2504                    .into(),
2505                    FullKey::for_test(
2506                        TableId::new(2),
2507                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2508                        0,
2509                    )
2510                    .encode()
2511                    .into(),
2512                ),
2513                gen_sst_info(
2514                    23,
2515                    vec![2],
2516                    FullKey::for_test(
2517                        TableId::new(0),
2518                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2519                        0,
2520                    )
2521                    .encode()
2522                    .into(),
2523                    FullKey::for_test(
2524                        TableId::new(2),
2525                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2526                        0,
2527                    )
2528                    .encode()
2529                    .into(),
2530                ),
2531                gen_sst_info(
2532                    24,
2533                    vec![2],
2534                    FullKey::for_test(
2535                        TableId::new(2),
2536                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2537                        0,
2538                    )
2539                    .encode()
2540                    .into(),
2541                    FullKey::for_test(
2542                        TableId::new(2),
2543                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2544                        0,
2545                    )
2546                    .encode()
2547                    .into(),
2548                ),
2549                gen_sst_info(
2550                    25,
2551                    vec![2],
2552                    FullKey::for_test(
2553                        TableId::new(0),
2554                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2555                        0,
2556                    )
2557                    .encode()
2558                    .into(),
2559                    FullKey::for_test(
2560                        TableId::new(0),
2561                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2562                        0,
2563                    )
2564                    .encode()
2565                    .into(),
2566                ),
2567            ],
2568            sub_level_id: 101,
2569            level_type: LevelType::Overlapping,
2570            total_file_size: 300,
2571            ..Default::default()
2572        });
2573
2574        {
2575            // split Overlapping level
2576            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2577
2578            let mut new_sst_id = 100.into();
2579            let x = group_split::split_sst_info_for_level_v2(
2580                &mut cg1.l0.sub_levels[0],
2581                &mut new_sst_id,
2582                split_key,
2583            );
2584            // assert_eq!(3, x.len());
2585            // assert_eq!(100, x[0].sst_id);
2586            // assert_eq!(100, x[0].sst_size);
2587            // assert_eq!(101, x[1].sst_id);
2588            // assert_eq!(100, x[1].sst_size);
2589            // assert_eq!(102, x[2].sst_id);
2590            // assert_eq!(100, x[2].sst_size);
2591
2592            let mut right_l0 = OverlappingLevel {
2593                sub_levels: vec![],
2594                total_file_size: 0,
2595                uncompressed_file_size: 0,
2596            };
2597
2598            right_l0.sub_levels.push(Level {
2599                level_idx: 0,
2600                table_infos: x,
2601                sub_level_id: 101,
2602                total_file_size: 100,
2603                level_type: LevelType::Overlapping,
2604                ..Default::default()
2605            });
2606
2607            let right_levels = Levels {
2608                levels: vec![],
2609                l0: right_l0,
2610                ..Default::default()
2611            };
2612
2613            merge_levels(cg1, right_levels);
2614        }
2615
2616        {
2617            // test split empty level
2618            let mut new_sst_id = 100.into();
2619            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2620            let x = group_split::split_sst_info_for_level_v2(
2621                &mut cg1.levels[2],
2622                &mut new_sst_id,
2623                split_key,
2624            );
2625
2626            assert!(x.is_empty());
2627        }
2628
2629        {
2630            // test split to right Nonoverlapping level
2631            let mut cg1 = cg1.clone();
2632            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2633
2634            let mut new_sst_id = 100.into();
2635            let x = group_split::split_sst_info_for_level_v2(
2636                &mut cg1.levels[0],
2637                &mut new_sst_id,
2638                split_key,
2639            );
2640
2641            assert_eq!(3, x.len());
2642            assert_eq!(1, x[0].sst_id);
2643            assert_eq!(100, x[0].sst_size);
2644            assert_eq!(10, x[1].sst_id);
2645            assert_eq!(100, x[1].sst_size);
2646            assert_eq!(11, x[2].sst_id);
2647            assert_eq!(100, x[2].sst_size);
2648
2649            assert_eq!(0, cg1.levels[0].table_infos.len());
2650        }
2651
2652        {
2653            // test split to left Nonoverlapping level
2654            let mut cg1 = cg1.clone();
2655            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2656
2657            let mut new_sst_id = 100.into();
2658            let x = group_split::split_sst_info_for_level_v2(
2659                &mut cg1.levels[0],
2660                &mut new_sst_id,
2661                split_key,
2662            );
2663
2664            assert_eq!(0, x.len());
2665            assert_eq!(3, cg1.levels[0].table_infos.len());
2666        }
2667
2668        // {
2669        //     // test split to both Nonoverlapping level
2670        //     let mut cg1 = cg1.clone();
2671        //     let split_key = build_split_key(3, VirtualNode::MAX);
2672
2673        //     let mut new_sst_id = 100;
2674        //     let x = group_split::split_sst_info_for_level_v2(
2675        //         &mut cg1.levels[0],
2676        //         &mut new_sst_id,
2677        //         split_key,
2678        //     );
2679
2680        //     assert_eq!(2, x.len());
2681        //     assert_eq!(100, x[0].sst_id);
2682        //     assert_eq!(100 / 2, x[0].sst_size);
2683        //     assert_eq!(11, x[1].sst_id);
2684        //     assert_eq!(100, x[1].sst_size);
2685        //     assert_eq!(vec![3, 4], x[0].table_ids);
2686
2687        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2688        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2689        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2690        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2691        // }
2692
2693        {
2694            // test split to both Nonoverlapping level
2695            let mut cg1 = cg1.clone();
2696            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2697
2698            let mut new_sst_id = 100.into();
2699            let x = group_split::split_sst_info_for_level_v2(
2700                &mut cg1.levels[0],
2701                &mut new_sst_id,
2702                split_key,
2703            );
2704
2705            assert_eq!(2, x.len());
2706            assert_eq!(100, x[0].sst_id);
2707            assert_eq!(100 / 2, x[0].sst_size);
2708            assert_eq!(11, x[1].sst_id);
2709            assert_eq!(100, x[1].sst_size);
2710            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2711
2712            assert_eq!(2, cg1.levels[0].table_infos.len());
2713            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2714            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2715            assert_eq!(
2716                vec![TableId::new(3)],
2717                cg1.levels[0].table_infos[1].table_ids
2718            );
2719        }
2720    }
2721
2722    fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2723        SstableInfoInner {
2724            sst_id: sst_id.into(),
2725            object_id: sst_id.into(),
2726            table_ids: table_ids.into_iter().map(TableId::new).collect(),
2727            file_size: sst_size,
2728            sst_size,
2729            uncompressed_file_size: sst_size * 2,
2730            ..Default::default()
2731        }
2732        .into()
2733    }
2734
2735    #[test]
2736    fn test_level_normalize() {
2737        // Mixed: some SSTs empty, some not.
2738        let mut level = Level {
2739            level_idx: 1,
2740            level_type: LevelType::Nonoverlapping,
2741            table_infos: vec![
2742                make_sst(1, vec![1, 2], 100),
2743                make_sst(2, vec![], 200), // empty → removed
2744                make_sst(3, vec![3], 300),
2745                make_sst(4, vec![], 400), // empty → removed
2746            ],
2747            total_file_size: 9999, // intentionally wrong
2748            uncompressed_file_size: 9999,
2749            ..Default::default()
2750        };
2751
2752        level.normalize();
2753
2754        assert_eq!(level.table_infos.len(), 2);
2755        assert_eq!(1, level.table_infos[0].sst_id);
2756        assert_eq!(3, level.table_infos[1].sst_id);
2757        assert_eq!(level.total_file_size, 400);
2758        assert_eq!(level.uncompressed_file_size, 800);
2759
2760        // No empty SSTs: just recompute sizes.
2761        level.total_file_size = 0;
2762        level.uncompressed_file_size = 0;
2763        level.normalize();
2764        assert_eq!(level.table_infos.len(), 2);
2765        assert_eq!(level.total_file_size, 400);
2766        assert_eq!(level.uncompressed_file_size, 800);
2767
2768        // All empty → level becomes empty.
2769        level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2770        level.normalize();
2771        assert!(level.table_infos.is_empty());
2772        assert_eq!(level.total_file_size, 0);
2773        assert_eq!(level.uncompressed_file_size, 0);
2774    }
2775
2776    #[test]
2777    fn test_level_delete_ssts() {
2778        let mut level = Level {
2779            level_idx: 1,
2780            level_type: LevelType::Nonoverlapping,
2781            table_infos: vec![
2782                make_sst(1, vec![1], 100),
2783                make_sst(2, vec![2], 200),
2784                make_sst(3, vec![3], 300),
2785            ],
2786            total_file_size: 600,
2787            uncompressed_file_size: 1200,
2788            ..Default::default()
2789        };
2790
2791        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2792        let changed = level.delete_ssts(&delete_ids);
2793
2794        assert!(changed);
2795        assert_eq!(level.table_infos.len(), 2);
2796        assert_eq!(1, level.table_infos[0].sst_id);
2797        assert_eq!(3, level.table_infos[1].sst_id);
2798        assert_eq!(level.total_file_size, 400);
2799        assert_eq!(level.uncompressed_file_size, 800);
2800
2801        // Delete non-existent id → no change, returns false.
2802        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2803        let changed = level.delete_ssts(&delete_ids);
2804        assert!(!changed);
2805        assert_eq!(level.table_infos.len(), 2);
2806    }
2807
2808    #[test]
2809    fn test_level_prune_table_ids_from_ssts() {
2810        let mut level = Level {
2811            level_idx: 1,
2812            level_type: LevelType::Nonoverlapping,
2813            table_infos: vec![
2814                make_sst(1, vec![1, 2], 100),
2815                make_sst(2, vec![2, 3], 200),
2816                make_sst(3, vec![2], 300),
2817            ],
2818            total_file_size: 600,
2819            uncompressed_file_size: 1200,
2820            ..Default::default()
2821        };
2822
2823        let pruned_table_ids = HashSet::from([TableId::new(2)]);
2824        level.prune_table_ids_from_ssts(&pruned_table_ids);
2825
2826        // SST 3 should be removed (was only table_id=2)
2827        assert_eq!(level.table_infos.len(), 2);
2828        assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2829        assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2830        assert_eq!(level.total_file_size, 100 + 200);
2831        assert_eq!(level.uncompressed_file_size, 200 + 400);
2832
2833        // Prune remaining tables, so all SSTs are removed.
2834        let pruned_table_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2835        level.prune_table_ids_from_ssts(&pruned_table_ids);
2836        assert!(level.table_infos.is_empty());
2837        assert_eq!(level.total_file_size, 0);
2838        assert_eq!(level.uncompressed_file_size, 0);
2839    }
2840
2841    #[test]
2842    fn test_overlapping_level_normalize() {
2843        let mut l0 = OverlappingLevel {
2844            sub_levels: vec![
2845                Level {
2846                    level_idx: 0,
2847                    table_infos: vec![make_sst(1, vec![1], 100)],
2848                    total_file_size: 100,
2849                    uncompressed_file_size: 200,
2850                    sub_level_id: 1,
2851                    ..Default::default()
2852                },
2853                Level {
2854                    level_idx: 0,
2855                    table_infos: vec![], // empty → should be removed
2856                    total_file_size: 0,
2857                    uncompressed_file_size: 0,
2858                    sub_level_id: 2,
2859                    ..Default::default()
2860                },
2861                Level {
2862                    level_idx: 0,
2863                    table_infos: vec![make_sst(3, vec![3], 300)],
2864                    total_file_size: 300,
2865                    uncompressed_file_size: 600,
2866                    sub_level_id: 3,
2867                    ..Default::default()
2868                },
2869            ],
2870            total_file_size: 9999, // intentionally wrong
2871            uncompressed_file_size: 9999,
2872        };
2873
2874        l0.normalize();
2875
2876        assert_eq!(l0.sub_levels.len(), 2);
2877        assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2878        assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2879        assert_eq!(l0.total_file_size, 100 + 300);
2880        assert_eq!(l0.uncompressed_file_size, 200 + 600);
2881
2882        // All sub-levels empty → normalize clears everything.
2883        l0.sub_levels = vec![Level {
2884            level_idx: 0,
2885            table_infos: vec![],
2886            ..Default::default()
2887        }];
2888        l0.normalize();
2889        assert!(l0.sub_levels.is_empty());
2890        assert_eq!(l0.total_file_size, 0);
2891        assert_eq!(l0.uncompressed_file_size, 0);
2892    }
2893
2894    #[test]
2895    fn test_levels_prune_table_ids_from_ssts() {
2896        #[expect(deprecated)]
2897        let mut levels = Levels {
2898            l0: OverlappingLevel {
2899                sub_levels: vec![
2900                    Level {
2901                        level_idx: 0,
2902                        table_infos: vec![
2903                            make_sst(1, vec![10], 100), // table 10
2904                            make_sst(2, vec![20], 200), // table 20
2905                        ],
2906                        total_file_size: 300,
2907                        uncompressed_file_size: 600,
2908                        sub_level_id: 1,
2909                        ..Default::default()
2910                    },
2911                    Level {
2912                        level_idx: 0,
2913                        table_infos: vec![
2914                            make_sst(3, vec![10], 150), // table 10
2915                        ],
2916                        total_file_size: 150,
2917                        uncompressed_file_size: 300,
2918                        sub_level_id: 2,
2919                        ..Default::default()
2920                    },
2921                ],
2922                total_file_size: 450,
2923                uncompressed_file_size: 900,
2924            },
2925            levels: vec![Level {
2926                level_idx: 1,
2927                level_type: LevelType::Nonoverlapping,
2928                table_infos: vec![
2929                    make_sst(4, vec![10, 20], 400), // shared SST
2930                    make_sst(5, vec![10], 500),     // table 10 only
2931                ],
2932                total_file_size: 900,
2933                uncompressed_file_size: 1800,
2934                ..Default::default()
2935            }],
2936            group_id: 1.into(),
2937            parent_group_id: 0.into(),
2938            member_table_ids: vec![],
2939            compaction_group_version_id: 0,
2940        };
2941
2942        // Prune table 10 from SST metadata.
2943        levels.prune_table_ids_from_ssts(&HashSet::from([TableId::new(10)]));
2944
2945        assert_eq!(levels.l0.sub_levels.len(), 1);
2946        assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2947        assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2948        assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2949        assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2950        assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2951
2952        assert_eq!(levels.l0.total_file_size, 200);
2953        assert_eq!(levels.l0.uncompressed_file_size, 400);
2954
2955        assert_eq!(levels.levels[0].table_infos.len(), 1);
2956        assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2957        assert_eq!(
2958            levels.levels[0].table_infos[0].table_ids,
2959            vec![TableId::new(20)]
2960        );
2961        assert_eq!(levels.levels[0].total_file_size, 400);
2962        assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2963
2964        assert_eq!(levels.compaction_group_version_id, 1);
2965    }
2966
2967    #[test]
2968    fn test_apply_version_delta_prune_table_ids_from_ssts() {
2969        let mut version = HummockVersion {
2970            id: HummockVersionId::new(0),
2971            levels: HashMap::from_iter([(1.into(), {
2972                #[expect(deprecated)]
2973                let levels = Levels {
2974                    l0: OverlappingLevel {
2975                        sub_levels: vec![
2976                            Level {
2977                                level_idx: 0,
2978                                level_type: LevelType::Overlapping,
2979                                table_infos: vec![
2980                                    make_sst(1, vec![100], 50), // only table 100
2981                                    make_sst(2, vec![200], 60), // only table 200
2982                                ],
2983                                total_file_size: 110,
2984                                uncompressed_file_size: 220,
2985                                sub_level_id: 1,
2986                                ..Default::default()
2987                            },
2988                            Level {
2989                                level_idx: 0,
2990                                level_type: LevelType::Overlapping,
2991                                table_infos: vec![
2992                                    make_sst(3, vec![100], 70), // only table 100
2993                                ],
2994                                total_file_size: 70,
2995                                uncompressed_file_size: 140,
2996                                sub_level_id: 2,
2997                                ..Default::default()
2998                            },
2999                        ],
3000                        total_file_size: 180,
3001                        uncompressed_file_size: 360,
3002                    },
3003                    levels: vec![Level {
3004                        level_idx: 1,
3005                        level_type: LevelType::Nonoverlapping,
3006                        table_infos: vec![
3007                            make_sst(4, vec![100, 200], 80), // shared
3008                            make_sst(5, vec![100], 90),      // only table 100
3009                        ],
3010                        total_file_size: 170,
3011                        uncompressed_file_size: 340,
3012                        ..Default::default()
3013                    }],
3014                    group_id: 1.into(),
3015                    parent_group_id: 0.into(),
3016                    member_table_ids: vec![],
3017                    compaction_group_version_id: 0,
3018                };
3019                levels
3020            })]),
3021            ..Default::default()
3022        };
3023
3024        let version_delta = HummockVersionDelta {
3025            id: HummockVersionId::new(1),
3026            group_deltas: HashMap::from_iter([(
3027                1.into(),
3028                GroupDeltas {
3029                    group_deltas: vec![GroupDelta::PruneTableIdsFromSsts(HashSet::from([
3030                        TableId::new(100),
3031                    ]))],
3032                },
3033            )]),
3034            ..Default::default()
3035        };
3036
3037        version.apply_version_delta(&version_delta);
3038
3039        let cg = version.get_compaction_group_levels(1.into());
3040
3041        assert_eq!(
3042            cg.l0.sub_levels.len(),
3043            1,
3044            "empty sub-level should be removed"
3045        );
3046        assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3047        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3048        assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3049        assert_eq!(
3050            cg.l0.sub_levels[0].table_infos[0].table_ids,
3051            vec![TableId::new(200)]
3052        );
3053
3054        assert_eq!(cg.l0.total_file_size, 60);
3055        assert_eq!(cg.l0.uncompressed_file_size, 120);
3056
3057        assert_eq!(cg.levels[0].table_infos.len(), 1);
3058        assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3059        assert_eq!(
3060            cg.levels[0].table_infos[0].table_ids,
3061            vec![TableId::new(200)]
3062        );
3063        assert_eq!(cg.levels[0].total_file_size, 80);
3064        assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3065
3066        assert_eq!(cg.compaction_group_version_id, 1);
3067    }
3068
3069    #[test]
3070    fn test_prune_stale_table_ids_from_ssts() {
3071        let live_table_id = TableId::new(100);
3072        let stale_table_id = TableId::new(200);
3073        let mut version = HummockVersion {
3074            id: HummockVersionId::new(0),
3075            levels: HashMap::from_iter([(1.into(), {
3076                #[expect(deprecated)]
3077                let levels = Levels {
3078                    l0: OverlappingLevel {
3079                        sub_levels: vec![Level {
3080                            level_idx: 0,
3081                            level_type: LevelType::Overlapping,
3082                            table_infos: vec![make_sst(
3083                                1,
3084                                vec![live_table_id.as_raw_id(), stale_table_id.as_raw_id()],
3085                                50,
3086                            )],
3087                            total_file_size: 50,
3088                            uncompressed_file_size: 100,
3089                            sub_level_id: 1,
3090                            ..Default::default()
3091                        }],
3092                        total_file_size: 50,
3093                        uncompressed_file_size: 100,
3094                    },
3095                    levels: vec![Level {
3096                        level_idx: 1,
3097                        level_type: LevelType::Nonoverlapping,
3098                        table_infos: vec![make_sst(2, vec![stale_table_id.as_raw_id()], 60)],
3099                        total_file_size: 60,
3100                        uncompressed_file_size: 120,
3101                        ..Default::default()
3102                    }],
3103                    group_id: 1.into(),
3104                    parent_group_id: 0.into(),
3105                    member_table_ids: vec![],
3106                    compaction_group_version_id: 0,
3107                };
3108                levels
3109            })]),
3110            state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
3111                HashMap::from_iter([(
3112                    live_table_id,
3113                    StateTableInfo {
3114                        committed_epoch: 1,
3115                        compaction_group_id: 1.into(),
3116                    },
3117                )]),
3118            ),
3119            ..Default::default()
3120        };
3121
3122        assert_eq!(version.prune_stale_table_ids_from_ssts(), 1);
3123
3124        let cg = version.get_compaction_group_levels(1.into());
3125        assert_eq!(cg.l0.sub_levels.len(), 1);
3126        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3127        assert_eq!(cg.l0.sub_levels[0].table_infos[0].sst_id, 1);
3128        assert_eq!(
3129            cg.l0.sub_levels[0].table_infos[0].table_ids,
3130            vec![live_table_id]
3131        );
3132        assert!(cg.levels[0].table_infos.is_empty());
3133        assert_eq!(cg.compaction_group_version_id, 1);
3134    }
3135
3136    #[test]
3137    fn test_prune_stale_table_ids_from_ssts_skips_legacy_member_table_ids() {
3138        let mut version = HummockVersion {
3139            id: HummockVersionId::new(0),
3140            levels: HashMap::from_iter([(1.into(), {
3141                #[expect(deprecated)]
3142                let levels = Levels {
3143                    l0: OverlappingLevel {
3144                        sub_levels: vec![Level {
3145                            level_idx: 0,
3146                            level_type: LevelType::Overlapping,
3147                            table_infos: vec![make_sst(1, vec![100, 200], 50)],
3148                            total_file_size: 50,
3149                            uncompressed_file_size: 100,
3150                            sub_level_id: 1,
3151                            ..Default::default()
3152                        }],
3153                        total_file_size: 50,
3154                        uncompressed_file_size: 100,
3155                    },
3156                    group_id: 1.into(),
3157                    parent_group_id: 0.into(),
3158                    member_table_ids: vec![100, 200],
3159                    compaction_group_version_id: 0,
3160                    ..Default::default()
3161                };
3162                levels
3163            })]),
3164            ..Default::default()
3165        };
3166
3167        assert_eq!(version.prune_stale_table_ids_from_ssts(), 0);
3168
3169        let cg = version.get_compaction_group_levels(1.into());
3170        assert_eq!(
3171            cg.l0.sub_levels[0].table_infos[0].table_ids,
3172            vec![TableId::new(100), TableId::new(200)]
3173        );
3174        assert_eq!(cg.compaction_group_version_id, 0);
3175    }
3176
3177    #[test]
3178    fn test_apply_version_delta_compact_l0() {
3179        let mut version = HummockVersion {
3180            id: HummockVersionId::new(0),
3181            levels: HashMap::from_iter([(1.into(), {
3182                #[expect(deprecated)]
3183                let levels = Levels {
3184                    l0: OverlappingLevel {
3185                        sub_levels: vec![
3186                            Level {
3187                                level_idx: 0,
3188                                level_type: LevelType::Nonoverlapping,
3189                                table_infos: vec![
3190                                    make_sst(1, vec![1], 100),
3191                                    make_sst(2, vec![2], 200),
3192                                ],
3193                                total_file_size: 300,
3194                                uncompressed_file_size: 600,
3195                                sub_level_id: 1,
3196                                ..Default::default()
3197                            },
3198                            Level {
3199                                level_idx: 0,
3200                                level_type: LevelType::Nonoverlapping,
3201                                table_infos: vec![make_sst(3, vec![3], 300)],
3202                                total_file_size: 300,
3203                                uncompressed_file_size: 600,
3204                                sub_level_id: 2,
3205                                ..Default::default()
3206                            },
3207                        ],
3208                        total_file_size: 600,
3209                        uncompressed_file_size: 1200,
3210                    },
3211                    levels: vec![Level {
3212                        level_idx: 1,
3213                        level_type: LevelType::Nonoverlapping,
3214                        table_infos: vec![],
3215                        total_file_size: 0,
3216                        uncompressed_file_size: 0,
3217                        ..Default::default()
3218                    }],
3219                    group_id: 1.into(),
3220                    parent_group_id: 0.into(),
3221                    member_table_ids: vec![],
3222                    compaction_group_version_id: 0,
3223                };
3224                levels
3225            })]),
3226            ..Default::default()
3227        };
3228
3229        let version_delta = HummockVersionDelta {
3230            id: HummockVersionId::new(1),
3231            group_deltas: HashMap::from_iter([(
3232                1.into(),
3233                GroupDeltas {
3234                    group_deltas: vec![
3235                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3236                            0, // L0
3237                            0,
3238                            HashSet::from([1.into(), 2.into(), 3.into()]),
3239                            vec![],
3240                            0,
3241                            0,
3242                        )),
3243                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3244                            1, // L1
3245                            0,
3246                            HashSet::new(),
3247                            vec![make_sst(10, vec![1, 2, 3], 500)],
3248                            0,
3249                            0,
3250                        )),
3251                    ],
3252                },
3253            )]),
3254            ..Default::default()
3255        };
3256
3257        version.apply_version_delta(&version_delta);
3258
3259        let cg = version.get_compaction_group_levels(1.into());
3260
3261        assert!(cg.l0.sub_levels.is_empty());
3262        assert_eq!(cg.l0.total_file_size, 0);
3263        assert_eq!(cg.l0.uncompressed_file_size, 0);
3264
3265        assert_eq!(cg.levels[0].table_infos.len(), 1);
3266        assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3267        assert_eq!(cg.levels[0].total_file_size, 500);
3268    }
3269}