risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52    pub insert_sst_level: u32,
53    pub insert_sst_infos: Vec<SstableInfo>,
54    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61        self.levels
62            .get(&compaction_group_id)
63            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64    }
65
66    pub fn get_compaction_group_levels_mut(
67        &mut self,
68        compaction_group_id: CompactionGroupId,
69    ) -> &mut Levels {
70        self.levels
71            .get_mut(&compaction_group_id)
72            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73    }
74
75    // only scan the sst infos from levels in the specified compaction group (without table change log)
76    pub fn get_sst_ids_by_group_id(
77        &self,
78        compaction_group_id: CompactionGroupId,
79    ) -> impl Iterator<Item = HummockSstableId> + '_ {
80        self.levels
81            .iter()
82            .filter_map(move |(cg_id, level)| {
83                if *cg_id == compaction_group_id {
84                    Some(level)
85                } else {
86                    None
87                }
88            })
89            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90            .flat_map(|level| level.table_infos.iter())
91            .map(|s| s.sst_id)
92    }
93
94    pub fn level_iter<F: FnMut(&Level) -> bool>(
95        &self,
96        compaction_group_id: CompactionGroupId,
97        mut f: F,
98    ) {
99        if let Some(levels) = self.levels.get(&compaction_group_id) {
100            for sub_level in &levels.l0.sub_levels {
101                if !f(sub_level) {
102                    return;
103                }
104            }
105            for level in &levels.levels {
106                if !f(level) {
107                    return;
108                }
109            }
110        }
111    }
112
113    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114        // l0 is currently separated from all levels
115        self.levels
116            .get(&compaction_group_id)
117            .map(|group| group.levels.len() + 1)
118            .unwrap_or(0)
119    }
120
121    pub fn safe_epoch_table_watermarks(
122        &self,
123        existing_table_ids: &[TableId],
124    ) -> BTreeMap<TableId, TableWatermarks> {
125        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126    }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131    existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133    fn extract_single_table_watermark(
134        table_watermarks: &TableWatermarks,
135    ) -> Option<TableWatermarks> {
136        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137            Some(TableWatermarks {
138                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139                direction: table_watermarks.direction,
140                watermark_type: table_watermarks.watermark_type,
141            })
142        } else {
143            None
144        }
145    }
146    table_watermarks
147        .iter()
148        .filter_map(|(table_id, table_watermarks)| {
149            if !existing_table_ids.contains(table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (*table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                table_id,
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level.normalize();
296                if insert_table_infos.is_empty() {
297                    continue;
298                }
299                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
300                    Ok(idx) => {
301                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
302                    }
303                    Err(idx) => {
304                        insert_new_sub_level(
305                            target_l0,
306                            sub_level.sub_level_id,
307                            sub_level.level_type,
308                            insert_table_infos,
309                            Some(idx),
310                        );
311                    }
312                }
313            }
314            l0.normalize();
315        }
316        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
317            let insert_table_infos =
318                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
319            cur_levels.levels[idx].total_file_size += insert_table_infos
320                .iter()
321                .map(|sst| sst.sst_size)
322                .sum::<u64>();
323            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
324                .iter()
325                .map(|sst| sst.uncompressed_file_size)
326                .sum::<u64>();
327            cur_levels.levels[idx]
328                .table_infos
329                .extend(insert_table_infos);
330            cur_levels.levels[idx]
331                .table_infos
332                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
333            assert!(can_concat(&cur_levels.levels[idx].table_infos));
334            level.normalize();
335        }
336
337        assert!(
338            parent_levels
339                .l0
340                .sub_levels
341                .iter()
342                .all(|level| !level.table_infos.is_empty())
343        );
344        assert!(
345            cur_levels
346                .l0
347                .sub_levels
348                .iter()
349                .all(|level| !level.table_infos.is_empty())
350        );
351    }
352
353    pub fn build_sst_delta_infos(
354        &self,
355        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
356    ) -> Vec<SstDeltaInfo> {
357        let mut infos = vec![];
358
359        // Skip trivial move delta for refiller
360        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
361        if version_delta.trivial_move {
362            return infos;
363        }
364
365        for (group_id, group_deltas) in &version_delta.group_deltas {
366            let mut info = SstDeltaInfo::default();
367
368            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
369            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
370
371            // Build only if all deltas are intra level deltas.
372            if !group_deltas.group_deltas.iter().all(|delta| {
373                matches!(
374                    delta,
375                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
376                )
377            }) {
378                continue;
379            }
380
381            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
382            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
383            // struct to reduce conventions.
384            for group_delta in &group_deltas.group_deltas {
385                match group_delta {
386                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
387                        if !inserted_table_infos.is_empty() {
388                            info.insert_sst_level = 0;
389                            info.insert_sst_infos
390                                .extend(inserted_table_infos.iter().cloned());
391                        }
392                    }
393                    GroupDeltaCommon::IntraLevel(intra_level) => {
394                        if !intra_level.inserted_table_infos.is_empty() {
395                            info.insert_sst_level = intra_level.level_idx;
396                            info.insert_sst_infos
397                                .extend(intra_level.inserted_table_infos.iter().cloned());
398                        }
399                        if !intra_level.removed_table_ids.is_empty() {
400                            for id in &intra_level.removed_table_ids {
401                                if intra_level.level_idx == 0 {
402                                    removed_l0_ssts.insert(*id);
403                                } else {
404                                    removed_ssts
405                                        .entry(intra_level.level_idx)
406                                        .or_default()
407                                        .insert(*id);
408                                }
409                            }
410                        }
411                    }
412                    GroupDeltaCommon::GroupConstruct(_)
413                    | GroupDeltaCommon::GroupDestroy(_)
414                    | GroupDeltaCommon::GroupMerge(_)
415                    | GroupDeltaCommon::TruncateTables(_) => {}
416                }
417            }
418
419            let group = self.levels.get(group_id).unwrap();
420            for l0_sub_level in &group.level0().sub_levels {
421                for sst_info in &l0_sub_level.table_infos {
422                    if removed_l0_ssts.remove(&sst_info.sst_id) {
423                        info.delete_sst_object_ids.push(sst_info.object_id);
424                    }
425                }
426            }
427            for level in &group.levels {
428                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
429                    for sst_info in &level.table_infos {
430                        if removed_level_ssts.remove(&sst_info.sst_id) {
431                            info.delete_sst_object_ids.push(sst_info.object_id);
432                        }
433                    }
434                    if !removed_level_ssts.is_empty() {
435                        tracing::error!(
436                            "removed_level_ssts is not empty: {:?}",
437                            removed_level_ssts,
438                        );
439                    }
440                    debug_assert!(removed_level_ssts.is_empty());
441                }
442            }
443
444            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
445                tracing::error!(
446                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
447                    removed_l0_ssts,
448                    removed_ssts
449                );
450            }
451            debug_assert!(removed_l0_ssts.is_empty());
452            debug_assert!(removed_ssts.is_empty());
453
454            infos.push(info);
455        }
456
457        infos
458    }
459
460    pub fn apply_version_delta(
461        &mut self,
462        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
463    ) -> HashMap<TableId, Option<StateTableInfo>> {
464        assert_eq!(self.id, version_delta.prev_id);
465
466        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
467            &version_delta.state_table_info_delta,
468            &version_delta.removed_table_ids,
469        );
470
471        #[expect(deprecated)]
472        {
473            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
474                is_commit_epoch = true;
475                tracing::trace!(
476                    "max committed epoch bumped but no table committed epoch is changed"
477                );
478            }
479        }
480
481        // apply to `levels`, which is different compaction groups
482        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
483            let mut is_applied_l0_compact = false;
484            for group_delta in &group_deltas.group_deltas {
485                match group_delta {
486                    GroupDeltaCommon::GroupConstruct(group_construct) => {
487                        let mut new_levels = build_initial_compaction_group_levels(
488                            *compaction_group_id,
489                            group_construct.get_group_config().unwrap(),
490                        );
491                        let parent_group_id = group_construct.parent_group_id;
492                        new_levels.parent_group_id = parent_group_id;
493                        #[expect(deprecated)]
494                        // for backward-compatibility of previous hummock version delta
495                        new_levels
496                            .member_table_ids
497                            .clone_from(&group_construct.table_ids);
498                        self.levels.insert(*compaction_group_id, new_levels);
499                        let member_table_ids = if group_construct.version()
500                            >= CompatibilityVersion::NoMemberTableIds
501                        {
502                            self.state_table_info
503                                .compaction_group_member_table_ids(*compaction_group_id)
504                                .iter()
505                                .copied()
506                                .collect()
507                        } else {
508                            #[expect(deprecated)]
509                            // for backward-compatibility of previous hummock version delta
510                            BTreeSet::from_iter(
511                                group_construct.table_ids.iter().copied().map(Into::into),
512                            )
513                        };
514
515                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
516                            let split_key = if group_construct.split_key.is_some() {
517                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
518                            } else {
519                                None
520                            };
521                            self.init_with_parent_group_v2(
522                                parent_group_id,
523                                *compaction_group_id,
524                                group_construct.new_sst_start_id,
525                                split_key.clone(),
526                            );
527                        } else {
528                            // for backward-compatibility of previous hummock version delta
529                            self.init_with_parent_group(
530                                parent_group_id,
531                                *compaction_group_id,
532                                member_table_ids,
533                                group_construct.new_sst_start_id,
534                            );
535                        }
536                    }
537                    GroupDeltaCommon::GroupMerge(group_merge) => {
538                        tracing::info!(
539                            "group_merge left {:?} right {:?}",
540                            group_merge.left_group_id,
541                            group_merge.right_group_id
542                        );
543                        self.merge_compaction_group(
544                            group_merge.left_group_id,
545                            group_merge.right_group_id,
546                        )
547                    }
548                    GroupDeltaCommon::IntraLevel(level_delta) => {
549                        let levels =
550                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
551                                panic!("compaction group {} does not exist", compaction_group_id)
552                            });
553                        if is_commit_epoch {
554                            assert!(
555                                level_delta.removed_table_ids.is_empty(),
556                                "no sst should be deleted when committing an epoch"
557                            );
558
559                            let IntraLevelDelta {
560                                level_idx,
561                                l0_sub_level_id,
562                                inserted_table_infos,
563                                ..
564                            } = level_delta;
565                            {
566                                assert_eq!(
567                                    *level_idx, 0,
568                                    "we should only add to L0 when we commit an epoch."
569                                );
570                                if !inserted_table_infos.is_empty() {
571                                    insert_new_sub_level(
572                                        &mut levels.l0,
573                                        *l0_sub_level_id,
574                                        PbLevelType::Overlapping,
575                                        inserted_table_infos.clone(),
576                                        None,
577                                    );
578                                }
579                            }
580                        } else {
581                            // The delta is caused by compaction.
582                            levels.apply_compact_ssts(
583                                level_delta,
584                                self.state_table_info
585                                    .compaction_group_member_table_ids(*compaction_group_id),
586                            );
587                            if level_delta.level_idx == 0 {
588                                is_applied_l0_compact = true;
589                            }
590                        }
591                    }
592                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
593                        let levels =
594                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
595                                panic!("compaction group {} does not exist", compaction_group_id)
596                            });
597                        assert!(is_commit_epoch);
598
599                        if !inserted_table_infos.is_empty() {
600                            let next_l0_sub_level_id = levels
601                                .l0
602                                .sub_levels
603                                .last()
604                                .map(|level| level.sub_level_id + 1)
605                                .unwrap_or(1);
606
607                            insert_new_sub_level(
608                                &mut levels.l0,
609                                next_l0_sub_level_id,
610                                PbLevelType::Overlapping,
611                                inserted_table_infos.clone(),
612                                None,
613                            );
614                        }
615                    }
616                    GroupDeltaCommon::GroupDestroy(_) => {
617                        self.levels.remove(compaction_group_id);
618                    }
619
620                    GroupDeltaCommon::TruncateTables(table_ids) => {
621                        self.levels
622                            .get_mut(compaction_group_id)
623                            .unwrap_or_else(|| {
624                                panic!("compaction group {} does not exist", compaction_group_id)
625                            })
626                            .truncate_tables(table_ids);
627                    }
628                }
629            }
630
631            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
632            {
633                levels.l0.normalize();
634            }
635        }
636        self.id = version_delta.id;
637        #[expect(deprecated)]
638        {
639            self.max_committed_epoch = version_delta.max_committed_epoch;
640        }
641
642        // apply to table watermark
643
644        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
645        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
646            HashMap::new();
647
648        // apply to table watermark
649        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
650            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
651                if version_delta.removed_table_ids.contains(table_id) {
652                    modified_table_watermarks.insert(*table_id, None);
653                } else {
654                    let mut current_table_watermarks = (**current_table_watermarks).clone();
655                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
656                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
657                }
658            } else {
659                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
660            }
661        }
662        for (table_id, table_watermarks) in &self.table_watermarks {
663            let safe_epoch = if let Some(state_table_info) =
664                self.state_table_info.info().get(table_id)
665                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
666                && state_table_info.committed_epoch > *oldest_epoch
667            {
668                // safe epoch has progressed, need further clear.
669                state_table_info.committed_epoch
670            } else {
671                // safe epoch not progressed or the table has been removed. No need to truncate
672                continue;
673            };
674            let table_watermarks = modified_table_watermarks
675                .entry(*table_id)
676                .or_insert_with(|| Some((**table_watermarks).clone()));
677            if let Some(table_watermarks) = table_watermarks {
678                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
679            }
680        }
681        // apply the staging table watermark to hummock version
682        for (table_id, table_watermarks) in modified_table_watermarks {
683            if let Some(table_watermarks) = table_watermarks {
684                self.table_watermarks
685                    .insert(table_id, Arc::new(table_watermarks));
686            } else {
687                self.table_watermarks.remove(&table_id);
688            }
689        }
690        // apply to vector index
691        apply_vector_index_delta(
692            &mut self.vector_indexes,
693            &version_delta.vector_index_delta,
694            &version_delta.removed_table_ids,
695        );
696
697        changed_table_info
698    }
699
700    pub fn apply_change_log_delta<T: Clone>(
701        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
702        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
703    ) {
704        for (table_id, change_log_delta) in change_log_delta {
705            let new_change_log = &change_log_delta.new_log;
706            match table_change_log.entry(*table_id) {
707                Entry::Occupied(entry) => {
708                    let change_log = entry.into_mut();
709                    change_log.add_change_log(new_change_log.clone());
710                }
711                Entry::Vacant(entry) => {
712                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
713                }
714            };
715        }
716
717        // truncate the remaining table change log
718        for (table_id, change_log_delta) in change_log_delta {
719            if let Some(change_log) = table_change_log.get_mut(table_id) {
720                change_log.truncate(change_log_delta.truncate_epoch);
721            }
722        }
723    }
724
725    /// Returns the log deltas required to truncate the entire change log for a table.
726    pub fn collect_gc_change_log_delta<'a, T: Clone>(
727        current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
728        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
729        removed_table_ids: &HashSet<TableId>,
730        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
731        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
732    ) -> HashSet<TableId> {
733        let mut gc_change_log_delta = HashSet::new();
734        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
735        // the change log for the table, and then we will remove the table change log.
736        // The table change log will also be removed when the table id is removed.
737        for table_id in current_change_log_table_ids {
738            if removed_table_ids.contains(table_id) {
739                gc_change_log_delta.insert(*table_id);
740                continue;
741            }
742            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
743                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
744                && table_info_delta.committed_epoch > prev_table_info.committed_epoch
745            {
746                // the table exists previously, and its committed epoch has progressed.
747            } else {
748                // otherwise, the table change log should be kept anyway
749                continue;
750            }
751            let contains = change_log_delta.contains_key(table_id);
752            if !contains {
753                gc_change_log_delta.insert(*table_id);
754                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
755                    LazyLock::new(|| LogSuppressor::per_second(1));
756                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
757                    warn!(
758                        suppressed_count,
759                        %table_id,
760                        "table change log dropped due to no further change log at newly committed epoch"
761                    );
762                }
763            }
764        }
765        gc_change_log_delta
766    }
767
768    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
769        let mut ret: BTreeMap<_, _> = BTreeMap::new();
770        for (compaction_group_id, group) in &self.levels {
771            let mut levels = vec![];
772            levels.extend(group.l0.sub_levels.iter());
773            levels.extend(group.levels.iter());
774            for level in levels {
775                for table_info in &level.table_infos {
776                    if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
777                        continue;
778                    }
779                    let object_id = table_info.object_id;
780                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
781                    entry
782                        .entry(*compaction_group_id)
783                        .or_default()
784                        .push(table_info.sst_id)
785                }
786            }
787        }
788        ret
789    }
790
791    pub fn merge_compaction_group(
792        &mut self,
793        left_group_id: CompactionGroupId,
794        right_group_id: CompactionGroupId,
795    ) {
796        // Double check
797        let left_group_id_table_ids = self
798            .state_table_info
799            .compaction_group_member_table_ids(left_group_id)
800            .iter();
801        let right_group_id_table_ids = self
802            .state_table_info
803            .compaction_group_member_table_ids(right_group_id)
804            .iter();
805
806        assert!(
807            left_group_id_table_ids
808                .chain(right_group_id_table_ids)
809                .is_sorted()
810        );
811
812        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
813        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
814            panic!(
815                "compaction group should exist right {} all {:?}",
816                right_group_id, total_cg
817            )
818        });
819
820        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
821            panic!(
822                "compaction group should exist left {} all {:?}",
823                left_group_id, total_cg
824            )
825        });
826
827        group_split::merge_levels(left_levels, right_levels);
828    }
829
830    pub fn init_with_parent_group_v2(
831        &mut self,
832        parent_group_id: CompactionGroupId,
833        group_id: CompactionGroupId,
834        new_sst_start_id: HummockSstableId,
835        split_key: Option<Bytes>,
836    ) {
837        let mut new_sst_id = new_sst_start_id;
838        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
839            if new_sst_start_id != 0 {
840                if cfg!(debug_assertions) {
841                    panic!(
842                        "non-zero sst start id {} for NewCompactionGroup",
843                        new_sst_start_id
844                    );
845                } else {
846                    warn!(
847                        %new_sst_start_id,
848                        "non-zero sst start id for NewCompactionGroup"
849                    );
850                }
851            }
852            return;
853        } else if !self.levels.contains_key(&parent_group_id) {
854            unreachable!(
855                "non-existing parent group id {} to init from (V2)",
856                parent_group_id
857            );
858        }
859
860        let [parent_levels, cur_levels] = self
861            .levels
862            .get_disjoint_mut([&parent_group_id, &group_id])
863            .map(|res| res.unwrap());
864        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
865        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
866        parent_levels.compaction_group_version_id += 1;
867        cur_levels.compaction_group_version_id += 1;
868
869        let l0 = &mut parent_levels.l0;
870        {
871            for sub_level in &mut l0.sub_levels {
872                let target_l0 = &mut cur_levels.l0;
873                // Remove SST from sub level may result in empty sub level. It will be purged
874                // whenever another compaction task is finished.
875                let insert_table_infos = if let Some(split_key) = &split_key {
876                    group_split::split_sst_info_for_level_v2(
877                        sub_level,
878                        &mut new_sst_id,
879                        split_key.clone(),
880                    )
881                } else {
882                    vec![]
883                };
884
885                if insert_table_infos.is_empty() {
886                    continue;
887                }
888
889                sub_level.normalize();
890                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
891                    Ok(idx) => {
892                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
893                    }
894                    Err(idx) => {
895                        insert_new_sub_level(
896                            target_l0,
897                            sub_level.sub_level_id,
898                            sub_level.level_type,
899                            insert_table_infos,
900                            Some(idx),
901                        );
902                    }
903                }
904            }
905            l0.normalize();
906        }
907
908        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
909            let insert_table_infos = if let Some(split_key) = &split_key {
910                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
911            } else {
912                vec![]
913            };
914
915            if insert_table_infos.is_empty() {
916                continue;
917            }
918
919            cur_levels.levels[idx].total_file_size += insert_table_infos
920                .iter()
921                .map(|sst| sst.sst_size)
922                .sum::<u64>();
923            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
924                .iter()
925                .map(|sst| sst.uncompressed_file_size)
926                .sum::<u64>();
927            cur_levels.levels[idx]
928                .table_infos
929                .extend(insert_table_infos);
930            cur_levels.levels[idx]
931                .table_infos
932                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
933            assert!(can_concat(&cur_levels.levels[idx].table_infos));
934            level.normalize();
935        }
936
937        assert!(
938            parent_levels
939                .l0
940                .sub_levels
941                .iter()
942                .all(|level| !level.table_infos.is_empty())
943        );
944        assert!(
945            cur_levels
946                .l0
947                .sub_levels
948                .iter()
949                .all(|level| !level.table_infos.is_empty())
950        );
951    }
952}
953
954impl<T> HummockVersionCommon<T>
955where
956    T: SstableIdReader + ObjectIdReader,
957{
958    pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
959        // DO NOT REMOVE THIS LINE
960        // This is to ensure that when adding new variant to `HummockObjectId`,
961        // the compiler will warn us if we forget to handle it here.
962        match HummockObjectId::Sstable(0.into()) {
963            HummockObjectId::Sstable(_) => {}
964            HummockObjectId::VectorFile(_) => {}
965            HummockObjectId::HnswGraphFile(_) => {}
966        };
967        self.get_sst_infos()
968            .map(|s| HummockObjectId::Sstable(s.object_id()))
969            .chain(
970                self.vector_indexes
971                    .values()
972                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
973            )
974    }
975
976    pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
977        self.get_sst_infos().map(|s| s.sst_id()).collect()
978    }
979
980    pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
981        self.get_combined_levels()
982            .flat_map(|level| level.table_infos.iter())
983    }
984}
985
986impl Levels {
987    pub(crate) fn apply_compact_ssts(
988        &mut self,
989        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
990        member_table_ids: &BTreeSet<TableId>,
991    ) {
992        let IntraLevelDeltaCommon {
993            level_idx,
994            l0_sub_level_id,
995            inserted_table_infos: insert_table_infos,
996            vnode_partition_count,
997            removed_table_ids: delete_sst_ids_set,
998            compaction_group_version_id,
999        } = level_delta;
1000        let new_vnode_partition_count = *vnode_partition_count;
1001
1002        if is_compaction_task_expired(
1003            self.compaction_group_version_id,
1004            *compaction_group_version_id,
1005        ) {
1006            warn!(
1007                current_compaction_group_version_id = self.compaction_group_version_id,
1008                delta_compaction_group_version_id = compaction_group_version_id,
1009                level_idx,
1010                l0_sub_level_id,
1011                insert_table_infos = ?insert_table_infos
1012                    .iter()
1013                    .map(|sst| (sst.sst_id, sst.object_id))
1014                    .collect_vec(),
1015                ?delete_sst_ids_set,
1016                "This VersionDelta may be committed by an expired compact task. Please check it."
1017            );
1018            return;
1019        }
1020        if !delete_sst_ids_set.is_empty() {
1021            if *level_idx == 0 {
1022                for level in &mut self.l0.sub_levels {
1023                    level.delete_ssts(delete_sst_ids_set);
1024                }
1025            } else {
1026                let idx = *level_idx as usize - 1;
1027                self.levels[idx].delete_ssts(delete_sst_ids_set);
1028            }
1029        }
1030
1031        if !insert_table_infos.is_empty() {
1032            let insert_sst_level_id = *level_idx;
1033            let insert_sub_level_id = *l0_sub_level_id;
1034            if insert_sst_level_id == 0 {
1035                let l0 = &mut self.l0;
1036                let index = l0
1037                    .sub_levels
1038                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1039                assert!(
1040                    index < l0.sub_levels.len()
1041                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1042                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1043                    insert_sub_level_id,
1044                    delete_sst_ids_set,
1045                    l0.sub_levels
1046                        .iter()
1047                        .map(|level| level.sub_level_id)
1048                        .collect_vec()
1049                );
1050                if l0.sub_levels[index].table_infos.is_empty()
1051                    && member_table_ids.len() == 1
1052                    && insert_table_infos.iter().all(|sst| {
1053                        sst.table_ids.len() == 1
1054                            && sst.table_ids[0]
1055                                == *member_table_ids.iter().next().expect("non-empty")
1056                    })
1057                {
1058                    // Only change vnode_partition_count for group which has only one state-table.
1059                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1060                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1061                }
1062                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1063            } else {
1064                let idx = insert_sst_level_id as usize - 1;
1065                if self.levels[idx].table_infos.is_empty()
1066                    && insert_table_infos
1067                        .iter()
1068                        .all(|sst| sst.table_ids.len() == 1)
1069                {
1070                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1071                } else if self.levels[idx].vnode_partition_count != 0
1072                    && new_vnode_partition_count == 0
1073                    && member_table_ids.len() > 1
1074                {
1075                    self.levels[idx].vnode_partition_count = 0;
1076                }
1077                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1078            }
1079        }
1080    }
1081
1082    /// Truncate specified table ids from all levels, remove emptied SSTs and sub-levels,
1083    /// then bump `compaction_group_version_id`.
1084    pub(crate) fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1085        for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1086            level.truncate_tables(table_ids);
1087        }
1088        self.l0.normalize();
1089        self.compaction_group_version_id += 1;
1090    }
1091}
1092
1093impl<T, L> HummockVersionCommon<T, L> {
1094    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1095        self.levels
1096            .values()
1097            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1098    }
1099}
1100
1101pub fn build_initial_compaction_group_levels(
1102    group_id: impl Into<CompactionGroupId>,
1103    compaction_config: &CompactionConfig,
1104) -> Levels {
1105    let mut levels = vec![];
1106    for l in 0..compaction_config.get_max_level() {
1107        levels.push(Level {
1108            level_idx: (l + 1) as u32,
1109            level_type: PbLevelType::Nonoverlapping,
1110            table_infos: vec![],
1111            total_file_size: 0,
1112            sub_level_id: 0,
1113            uncompressed_file_size: 0,
1114            vnode_partition_count: 0,
1115        });
1116    }
1117    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1118    Levels {
1119        levels,
1120        l0: OverlappingLevel {
1121            sub_levels: vec![],
1122            total_file_size: 0,
1123            uncompressed_file_size: 0,
1124        },
1125        group_id: group_id.into(),
1126        parent_group_id: 0.into(),
1127        member_table_ids: vec![],
1128        compaction_group_version_id: 0,
1129    }
1130}
1131
1132fn split_sst_info_for_level(
1133    member_table_ids: &BTreeSet<TableId>,
1134    level: &mut Level,
1135    new_sst_id: &mut HummockSstableId,
1136) -> Vec<SstableInfo> {
1137    // Remove SST from sub level may result in empty sub level. It will be purged
1138    // whenever another compaction task is finished.
1139    let mut insert_table_infos = vec![];
1140    for sst_info in &mut level.table_infos {
1141        let removed_table_ids = sst_info
1142            .table_ids
1143            .iter()
1144            .filter(|table_id| member_table_ids.contains(*table_id))
1145            .cloned()
1146            .collect_vec();
1147        let sst_size = sst_info.sst_size;
1148        if sst_size / 2 == 0 {
1149            tracing::warn!(
1150                id = %sst_info.sst_id,
1151                object_id = %sst_info.object_id,
1152                sst_size = sst_info.sst_size,
1153                file_size = sst_info.file_size,
1154                "Sstable sst_size is under expected",
1155            );
1156        };
1157        if !removed_table_ids.is_empty() {
1158            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1159                sst_info,
1160                new_sst_id,
1161                sst_size / 2,
1162                sst_size / 2,
1163                member_table_ids.iter().cloned().collect_vec(),
1164            );
1165            *sst_info = modified_sst;
1166            insert_table_infos.push(branch_sst);
1167        }
1168    }
1169    insert_table_infos
1170}
1171
1172/// Gets all compaction group ids.
1173pub fn get_compaction_group_ids(
1174    version: &HummockVersion,
1175) -> impl Iterator<Item = CompactionGroupId> + '_ {
1176    version.levels.keys().cloned()
1177}
1178
1179pub fn get_table_compaction_group_id_mapping(
1180    version: &HummockVersion,
1181) -> HashMap<StateTableId, CompactionGroupId> {
1182    version
1183        .state_table_info
1184        .info()
1185        .iter()
1186        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1187        .collect()
1188}
1189
1190/// Gets all SSTs in `group_id`
1191pub fn get_compaction_group_ssts(
1192    version: &HummockVersion,
1193    group_id: CompactionGroupId,
1194) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1195    let group_levels = version.get_compaction_group_levels(group_id);
1196    group_levels
1197        .l0
1198        .sub_levels
1199        .iter()
1200        .rev()
1201        .chain(group_levels.levels.iter())
1202        .flat_map(|level| {
1203            level
1204                .table_infos
1205                .iter()
1206                .map(|table_info| (table_info.object_id, table_info.sst_id))
1207        })
1208}
1209
1210pub fn new_sub_level(
1211    sub_level_id: u64,
1212    level_type: PbLevelType,
1213    table_infos: Vec<SstableInfo>,
1214) -> Level {
1215    if level_type == PbLevelType::Nonoverlapping {
1216        debug_assert!(
1217            can_concat(&table_infos),
1218            "sst of non-overlapping level is not concat-able: {:?}",
1219            table_infos
1220        );
1221    }
1222    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1223    let uncompressed_file_size = table_infos
1224        .iter()
1225        .map(|table| table.uncompressed_file_size)
1226        .sum();
1227    Level {
1228        level_idx: 0,
1229        level_type,
1230        table_infos,
1231        total_file_size,
1232        sub_level_id,
1233        uncompressed_file_size,
1234        vnode_partition_count: 0,
1235    }
1236}
1237
1238pub fn add_ssts_to_sub_level(
1239    l0: &mut OverlappingLevel,
1240    sub_level_idx: usize,
1241    insert_table_infos: Vec<SstableInfo>,
1242) {
1243    insert_table_infos.iter().for_each(|sst| {
1244        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1245        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1246        l0.total_file_size += sst.sst_size;
1247        l0.uncompressed_file_size += sst.uncompressed_file_size;
1248    });
1249    l0.sub_levels[sub_level_idx]
1250        .table_infos
1251        .extend(insert_table_infos);
1252    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1253        l0.sub_levels[sub_level_idx]
1254            .table_infos
1255            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1256        assert!(
1257            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1258            "sstable ids: {:?}",
1259            l0.sub_levels[sub_level_idx]
1260                .table_infos
1261                .iter()
1262                .map(|sst| sst.sst_id)
1263                .collect_vec()
1264        );
1265    }
1266}
1267
1268/// `None` value of `sub_level_insert_hint` means append.
1269pub fn insert_new_sub_level(
1270    l0: &mut OverlappingLevel,
1271    insert_sub_level_id: u64,
1272    level_type: PbLevelType,
1273    insert_table_infos: Vec<SstableInfo>,
1274    sub_level_insert_hint: Option<usize>,
1275) {
1276    if insert_sub_level_id == u64::MAX {
1277        return;
1278    }
1279    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1280        insert_pos
1281    } else {
1282        if let Some(newest_level) = l0.sub_levels.last() {
1283            assert!(
1284                newest_level.sub_level_id < insert_sub_level_id,
1285                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1286                newest_level.sub_level_id,
1287                insert_sub_level_id,
1288                l0,
1289            );
1290        }
1291        l0.sub_levels.len()
1292    };
1293    #[cfg(debug_assertions)]
1294    {
1295        if insert_pos > 0
1296            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1297        {
1298            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1299        }
1300        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1301            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1302        }
1303    }
1304    // All files will be committed in one new Overlapping sub-level and become
1305    // Nonoverlapping  after at least one compaction.
1306    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1307    l0.total_file_size += level.total_file_size;
1308    l0.uncompressed_file_size += level.uncompressed_file_size;
1309    l0.sub_levels.insert(insert_pos, level);
1310}
1311
1312impl Level {
1313    fn recompute_size(&mut self) {
1314        self.total_file_size = self
1315            .table_infos
1316            .iter()
1317            .map(|table| table.sst_size)
1318            .sum::<u64>();
1319        self.uncompressed_file_size = self
1320            .table_infos
1321            .iter()
1322            .map(|table| table.uncompressed_file_size)
1323            .sum::<u64>();
1324    }
1325
1326    /// Remove SSTs with empty `table_ids`, then recompute sizes.
1327    fn normalize(&mut self) {
1328        self.table_infos
1329            .retain(|sst_info| !sst_info.table_ids.is_empty());
1330        self.recompute_size();
1331    }
1332
1333    /// Return `true` if any SST was actually removed.
1334    fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1335        let original_len = self.table_infos.len();
1336        self.table_infos
1337            .retain(|table| !ids.contains(&table.sst_id));
1338        self.recompute_size();
1339        original_len != self.table_infos.len()
1340    }
1341
1342    /// Truncate specified `table_ids` from each SST's metadata,
1343    /// remove SSTs that become empty, then recompute sizes.
1344    fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1345        for sstable_info in &mut self.table_infos {
1346            let mut inner = sstable_info.get_inner();
1347            inner.table_ids.retain(|id| !table_ids.contains(id));
1348            sstable_info.set_inner(inner);
1349        }
1350        self.normalize();
1351    }
1352}
1353
1354impl OverlappingLevel {
1355    /// Remove empty sub-levels, then recompute aggregated sizes.
1356    fn normalize(&mut self) {
1357        self.sub_levels
1358            .retain(|level| !level.table_infos.is_empty());
1359        self.total_file_size = self
1360            .sub_levels
1361            .iter()
1362            .map(|level| level.total_file_size)
1363            .sum::<u64>();
1364        self.uncompressed_file_size = self
1365            .sub_levels
1366            .iter()
1367            .map(|level| level.uncompressed_file_size)
1368            .sum::<u64>();
1369    }
1370}
1371
1372fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1373    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1374        format!(
1375            "sstable ids: {:?}",
1376            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1377        )
1378    }
1379    operand.total_file_size += insert_table_infos
1380        .iter()
1381        .map(|sst| sst.sst_size)
1382        .sum::<u64>();
1383    operand.uncompressed_file_size += insert_table_infos
1384        .iter()
1385        .map(|sst| sst.uncompressed_file_size)
1386        .sum::<u64>();
1387    if operand.level_type == PbLevelType::Overlapping {
1388        operand.level_type = PbLevelType::Nonoverlapping;
1389        operand
1390            .table_infos
1391            .extend(insert_table_infos.iter().cloned());
1392        operand
1393            .table_infos
1394            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1395        assert!(
1396            can_concat(&operand.table_infos),
1397            "{}",
1398            display_sstable_infos(&operand.table_infos)
1399        );
1400    } else if !insert_table_infos.is_empty() {
1401        let sorted_insert: Vec<_> = insert_table_infos
1402            .iter()
1403            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1404            .cloned()
1405            .collect();
1406        let first = &sorted_insert[0];
1407        let last = &sorted_insert[sorted_insert.len() - 1];
1408        let pos = operand
1409            .table_infos
1410            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1411        if pos >= operand.table_infos.len()
1412            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1413        {
1414            operand.table_infos.splice(pos..pos, sorted_insert);
1415            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1416            let validate_range = operand
1417                .table_infos
1418                .iter()
1419                .skip(pos.saturating_sub(1))
1420                .take(insert_table_infos.len() + 2)
1421                .collect_vec();
1422            assert!(
1423                can_concat(&validate_range),
1424                "{}",
1425                display_sstable_infos(&validate_range),
1426            );
1427        } else {
1428            // If this branch is reached, it indicates some unexpected behavior in compaction.
1429            // Here we issue a warning and fall back to insert one by one.
1430            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1431            for i in insert_table_infos {
1432                let pos = operand
1433                    .table_infos
1434                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1435                operand.table_infos.insert(pos, i.clone());
1436            }
1437            assert!(
1438                can_concat(&operand.table_infos),
1439                "{}",
1440                display_sstable_infos(&operand.table_infos)
1441            );
1442        }
1443    }
1444}
1445
1446pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1447    // DO NOT REMOVE THIS LINE
1448    // This is to ensure that when adding new variant to `HummockObjectId`,
1449    // the compiler will warn us if we forget to handle it here.
1450    match HummockObjectId::Sstable(0.into()) {
1451        HummockObjectId::Sstable(_) => {}
1452        HummockObjectId::VectorFile(_) => {}
1453        HummockObjectId::HnswGraphFile(_) => {}
1454    };
1455    version
1456        .levels
1457        .values()
1458        .flat_map(|cg| {
1459            cg.level0()
1460                .sub_levels
1461                .iter()
1462                .chain(cg.levels.iter())
1463                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1464        })
1465        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1466        .chain(
1467            version
1468                .vector_indexes
1469                .values()
1470                .flat_map(|index| index.get_objects()),
1471        )
1472        .collect()
1473}
1474
1475/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1476/// Currently this method is only used by risectl validate-version.
1477pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1478    let mut res = Vec::new();
1479    // Ensure each table maps to only one compaction group
1480    for (group_id, levels) in &version.levels {
1481        // Ensure compaction group id matches
1482        if levels.group_id != *group_id {
1483            res.push(format!(
1484                "GROUP {}: inconsistent group id {} in Levels",
1485                group_id, levels.group_id
1486            ));
1487        }
1488
1489        let validate_level = |group: CompactionGroupId,
1490                              expected_level_idx: u32,
1491                              level: &Level,
1492                              res: &mut Vec<String>| {
1493            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1494            if level.level_idx == 0 {
1495                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1496                // Ensure sub-level is not empty
1497                if level.table_infos.is_empty() {
1498                    res.push(format!("{}: empty level", level_identifier));
1499                }
1500            } else if level.level_type != PbLevelType::Nonoverlapping {
1501                // Ensure non-L0 level is non-overlapping level
1502                res.push(format!(
1503                    "{}: level type {:?} is not non-overlapping",
1504                    level_identifier, level.level_type
1505                ));
1506            }
1507
1508            // Ensure level idx matches
1509            if level.level_idx != expected_level_idx {
1510                res.push(format!(
1511                    "{}: mismatched level idx {}",
1512                    level_identifier, expected_level_idx
1513                ));
1514            }
1515
1516            let mut prev_table_info: Option<&SstableInfo> = None;
1517            for table_info in &level.table_infos {
1518                // Ensure table_ids are sorted and unique
1519                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1520                    res.push(format!(
1521                        "{} SST {}: table_ids not sorted",
1522                        level_identifier, table_info.object_id
1523                    ));
1524                }
1525
1526                // Ensure SSTs in non-overlapping level have non-overlapping key range
1527                if level.level_type == PbLevelType::Nonoverlapping {
1528                    if let Some(prev) = prev_table_info.take()
1529                        && prev
1530                            .key_range
1531                            .compare_right_with(&table_info.key_range.left)
1532                            != Ordering::Less
1533                    {
1534                        res.push(format!(
1535                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1536                            level_identifier, table_info.object_id, prev, table_info
1537                        ));
1538                    }
1539                    let _ = prev_table_info.insert(table_info);
1540                }
1541            }
1542        };
1543
1544        let l0 = &levels.l0;
1545        let mut prev_sub_level_id = u64::MAX;
1546        for sub_level in &l0.sub_levels {
1547            // Ensure sub_level_id is sorted and unique
1548            if sub_level.sub_level_id >= prev_sub_level_id {
1549                res.push(format!(
1550                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1551                    group_id, sub_level.level_idx, prev_sub_level_id
1552                ));
1553            }
1554            prev_sub_level_id = sub_level.sub_level_id;
1555
1556            validate_level(*group_id, 0, sub_level, &mut res);
1557        }
1558
1559        for idx in 1..=levels.levels.len() {
1560            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1561        }
1562    }
1563    res
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568    use std::collections::{HashMap, HashSet};
1569
1570    use bytes::Bytes;
1571    use risingwave_common::catalog::TableId;
1572    use risingwave_common::hash::VirtualNode;
1573    use risingwave_common::util::epoch::test_epoch;
1574    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1575
1576    use super::group_split;
1577    use crate::HummockVersionId;
1578    use crate::compaction_group::group_split::*;
1579    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1580    use crate::key::{FullKey, gen_key_from_str};
1581    use crate::key_range::KeyRange;
1582    use crate::level::{Level, Levels, OverlappingLevel};
1583    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1584    use crate::version::{
1585        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1586    };
1587
1588    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1589        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1590    }
1591
1592    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1593        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1594        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1595        let full_key_l = FullKey::for_test(
1596            TableId::new(*table_ids.first().unwrap()),
1597            table_key_l,
1598            epoch,
1599        )
1600        .encode();
1601        let full_key_r =
1602            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1603                .encode();
1604
1605        SstableInfoInner {
1606            sst_id: sst_id.into(),
1607            key_range: KeyRange {
1608                left: full_key_l.into(),
1609                right: full_key_r.into(),
1610                right_exclusive: false,
1611            },
1612            table_ids: table_ids.into_iter().map(Into::into).collect(),
1613            object_id: sst_id.into(),
1614            min_epoch: 20,
1615            max_epoch: 20,
1616            file_size: 100,
1617            sst_size: 100,
1618            ..Default::default()
1619        }
1620    }
1621
1622    #[test]
1623    fn test_get_sst_object_ids() {
1624        let mut version = HummockVersion {
1625            id: HummockVersionId::new(0),
1626            levels: HashMap::from_iter([(
1627                0.into(),
1628                Levels {
1629                    levels: vec![],
1630                    l0: OverlappingLevel {
1631                        sub_levels: vec![],
1632                        total_file_size: 0,
1633                        uncompressed_file_size: 0,
1634                    },
1635                    ..Default::default()
1636                },
1637            )]),
1638            ..Default::default()
1639        };
1640        assert_eq!(version.get_object_ids().count(), 0);
1641
1642        // Add to sub level
1643        version
1644            .levels
1645            .get_mut(&0)
1646            .unwrap()
1647            .l0
1648            .sub_levels
1649            .push(Level {
1650                table_infos: vec![
1651                    SstableInfoInner {
1652                        object_id: 11.into(),
1653                        sst_id: 11.into(),
1654                        ..Default::default()
1655                    }
1656                    .into(),
1657                ],
1658                ..Default::default()
1659            });
1660        assert_eq!(version.get_object_ids().count(), 1);
1661
1662        // Add to non sub level
1663        version.levels.get_mut(&0).unwrap().levels.push(Level {
1664            table_infos: vec![
1665                SstableInfoInner {
1666                    object_id: 22.into(),
1667                    sst_id: 22.into(),
1668                    ..Default::default()
1669                }
1670                .into(),
1671            ],
1672            ..Default::default()
1673        });
1674        assert_eq!(version.get_object_ids().count(), 2);
1675    }
1676
1677    #[test]
1678    fn test_apply_version_delta() {
1679        let mut version = HummockVersion {
1680            id: HummockVersionId::new(0),
1681            levels: HashMap::from_iter([
1682                (
1683                    0.into(),
1684                    build_initial_compaction_group_levels(
1685                        0,
1686                        &CompactionConfig {
1687                            max_level: 6,
1688                            ..Default::default()
1689                        },
1690                    ),
1691                ),
1692                (
1693                    1.into(),
1694                    build_initial_compaction_group_levels(
1695                        1,
1696                        &CompactionConfig {
1697                            max_level: 6,
1698                            ..Default::default()
1699                        },
1700                    ),
1701                ),
1702            ]),
1703            ..Default::default()
1704        };
1705        let version_delta = HummockVersionDelta {
1706            id: HummockVersionId::new(1),
1707            group_deltas: HashMap::from_iter([
1708                (
1709                    2.into(),
1710                    GroupDeltas {
1711                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1712                            group_config: Some(CompactionConfig {
1713                                max_level: 6,
1714                                ..Default::default()
1715                            }),
1716                            ..Default::default()
1717                        }))],
1718                    },
1719                ),
1720                (
1721                    0.into(),
1722                    GroupDeltas {
1723                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1724                    },
1725                ),
1726                (
1727                    1.into(),
1728                    GroupDeltas {
1729                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1730                            1,
1731                            0,
1732                            HashSet::new(),
1733                            vec![
1734                                SstableInfoInner {
1735                                    object_id: 1.into(),
1736                                    sst_id: 1.into(),
1737                                    ..Default::default()
1738                                }
1739                                .into(),
1740                            ],
1741                            0,
1742                            version
1743                                .levels
1744                                .get(&1)
1745                                .as_ref()
1746                                .unwrap()
1747                                .compaction_group_version_id,
1748                        ))],
1749                    },
1750                ),
1751            ]),
1752            ..Default::default()
1753        };
1754        let version_delta = version_delta;
1755
1756        version.apply_version_delta(&version_delta);
1757        let mut cg1 = build_initial_compaction_group_levels(
1758            1,
1759            &CompactionConfig {
1760                max_level: 6,
1761                ..Default::default()
1762            },
1763        );
1764        cg1.levels[0] = Level {
1765            level_idx: 1,
1766            level_type: LevelType::Nonoverlapping,
1767            table_infos: vec![
1768                SstableInfoInner {
1769                    object_id: 1.into(),
1770                    sst_id: 1.into(),
1771                    ..Default::default()
1772                }
1773                .into(),
1774            ],
1775            ..Default::default()
1776        };
1777        assert_eq!(
1778            version,
1779            HummockVersion {
1780                id: HummockVersionId::new(1),
1781                levels: HashMap::from_iter([
1782                    (
1783                        2.into(),
1784                        build_initial_compaction_group_levels(
1785                            2,
1786                            &CompactionConfig {
1787                                max_level: 6,
1788                                ..Default::default()
1789                            },
1790                        ),
1791                    ),
1792                    (1.into(), cg1),
1793                ]),
1794                ..Default::default()
1795            }
1796        );
1797    }
1798
1799    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1800        gen_sst_info_impl(object_id, table_ids, left, right).into()
1801    }
1802
1803    fn gen_sst_info_impl(
1804        object_id: u64,
1805        table_ids: Vec<u32>,
1806        left: Bytes,
1807        right: Bytes,
1808    ) -> SstableInfoInner {
1809        SstableInfoInner {
1810            object_id: object_id.into(),
1811            sst_id: object_id.into(),
1812            key_range: KeyRange {
1813                left,
1814                right,
1815                right_exclusive: false,
1816            },
1817            table_ids: table_ids.into_iter().map(Into::into).collect(),
1818            file_size: 100,
1819            sst_size: 100,
1820            uncompressed_file_size: 100,
1821            ..Default::default()
1822        }
1823    }
1824
1825    #[test]
1826    fn test_merge_levels() {
1827        let mut left_levels = build_initial_compaction_group_levels(
1828            1,
1829            &CompactionConfig {
1830                max_level: 6,
1831                ..Default::default()
1832            },
1833        );
1834
1835        let mut right_levels = build_initial_compaction_group_levels(
1836            2,
1837            &CompactionConfig {
1838                max_level: 6,
1839                ..Default::default()
1840            },
1841        );
1842
1843        left_levels.levels[0] = Level {
1844            level_idx: 1,
1845            level_type: LevelType::Nonoverlapping,
1846            table_infos: vec![
1847                gen_sst_info(
1848                    1,
1849                    vec![3],
1850                    FullKey::for_test(
1851                        TableId::new(3),
1852                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1853                        0,
1854                    )
1855                    .encode()
1856                    .into(),
1857                    FullKey::for_test(
1858                        TableId::new(3),
1859                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1860                        0,
1861                    )
1862                    .encode()
1863                    .into(),
1864                ),
1865                gen_sst_info(
1866                    10,
1867                    vec![3, 4],
1868                    FullKey::for_test(
1869                        TableId::new(3),
1870                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1871                        0,
1872                    )
1873                    .encode()
1874                    .into(),
1875                    FullKey::for_test(
1876                        TableId::new(4),
1877                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1878                        0,
1879                    )
1880                    .encode()
1881                    .into(),
1882                ),
1883                gen_sst_info(
1884                    11,
1885                    vec![4],
1886                    FullKey::for_test(
1887                        TableId::new(4),
1888                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1889                        0,
1890                    )
1891                    .encode()
1892                    .into(),
1893                    FullKey::for_test(
1894                        TableId::new(4),
1895                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1896                        0,
1897                    )
1898                    .encode()
1899                    .into(),
1900                ),
1901            ],
1902            total_file_size: 300,
1903            ..Default::default()
1904        };
1905
1906        left_levels.l0.sub_levels.push(Level {
1907            level_idx: 0,
1908            table_infos: vec![gen_sst_info(
1909                3,
1910                vec![3],
1911                FullKey::for_test(
1912                    TableId::new(3),
1913                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1914                    0,
1915                )
1916                .encode()
1917                .into(),
1918                FullKey::for_test(
1919                    TableId::new(3),
1920                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1921                    0,
1922                )
1923                .encode()
1924                .into(),
1925            )],
1926            sub_level_id: 101,
1927            level_type: LevelType::Overlapping,
1928            total_file_size: 100,
1929            ..Default::default()
1930        });
1931
1932        left_levels.l0.sub_levels.push(Level {
1933            level_idx: 0,
1934            table_infos: vec![gen_sst_info(
1935                3,
1936                vec![3],
1937                FullKey::for_test(
1938                    TableId::new(3),
1939                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1940                    0,
1941                )
1942                .encode()
1943                .into(),
1944                FullKey::for_test(
1945                    TableId::new(3),
1946                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1947                    0,
1948                )
1949                .encode()
1950                .into(),
1951            )],
1952            sub_level_id: 103,
1953            level_type: LevelType::Overlapping,
1954            total_file_size: 100,
1955            ..Default::default()
1956        });
1957
1958        left_levels.l0.sub_levels.push(Level {
1959            level_idx: 0,
1960            table_infos: vec![gen_sst_info(
1961                3,
1962                vec![3],
1963                FullKey::for_test(
1964                    TableId::new(3),
1965                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1966                    0,
1967                )
1968                .encode()
1969                .into(),
1970                FullKey::for_test(
1971                    TableId::new(3),
1972                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1973                    0,
1974                )
1975                .encode()
1976                .into(),
1977            )],
1978            sub_level_id: 105,
1979            level_type: LevelType::Nonoverlapping,
1980            total_file_size: 100,
1981            ..Default::default()
1982        });
1983
1984        right_levels.levels[0] = Level {
1985            level_idx: 1,
1986            level_type: LevelType::Nonoverlapping,
1987            table_infos: vec![
1988                gen_sst_info(
1989                    1,
1990                    vec![5],
1991                    FullKey::for_test(
1992                        TableId::new(5),
1993                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1994                        0,
1995                    )
1996                    .encode()
1997                    .into(),
1998                    FullKey::for_test(
1999                        TableId::new(5),
2000                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2001                        0,
2002                    )
2003                    .encode()
2004                    .into(),
2005                ),
2006                gen_sst_info(
2007                    10,
2008                    vec![5, 6],
2009                    FullKey::for_test(
2010                        TableId::new(5),
2011                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2012                        0,
2013                    )
2014                    .encode()
2015                    .into(),
2016                    FullKey::for_test(
2017                        TableId::new(6),
2018                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2019                        0,
2020                    )
2021                    .encode()
2022                    .into(),
2023                ),
2024                gen_sst_info(
2025                    11,
2026                    vec![6],
2027                    FullKey::for_test(
2028                        TableId::new(6),
2029                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2030                        0,
2031                    )
2032                    .encode()
2033                    .into(),
2034                    FullKey::for_test(
2035                        TableId::new(6),
2036                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2037                        0,
2038                    )
2039                    .encode()
2040                    .into(),
2041                ),
2042            ],
2043            total_file_size: 300,
2044            ..Default::default()
2045        };
2046
2047        right_levels.l0.sub_levels.push(Level {
2048            level_idx: 0,
2049            table_infos: vec![gen_sst_info(
2050                3,
2051                vec![5],
2052                FullKey::for_test(
2053                    TableId::new(5),
2054                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2055                    0,
2056                )
2057                .encode()
2058                .into(),
2059                FullKey::for_test(
2060                    TableId::new(5),
2061                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2062                    0,
2063                )
2064                .encode()
2065                .into(),
2066            )],
2067            sub_level_id: 101,
2068            level_type: LevelType::Overlapping,
2069            total_file_size: 100,
2070            ..Default::default()
2071        });
2072
2073        right_levels.l0.sub_levels.push(Level {
2074            level_idx: 0,
2075            table_infos: vec![gen_sst_info(
2076                5,
2077                vec![5],
2078                FullKey::for_test(
2079                    TableId::new(5),
2080                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2081                    0,
2082                )
2083                .encode()
2084                .into(),
2085                FullKey::for_test(
2086                    TableId::new(5),
2087                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2088                    0,
2089                )
2090                .encode()
2091                .into(),
2092            )],
2093            sub_level_id: 102,
2094            level_type: LevelType::Overlapping,
2095            total_file_size: 100,
2096            ..Default::default()
2097        });
2098
2099        right_levels.l0.sub_levels.push(Level {
2100            level_idx: 0,
2101            table_infos: vec![gen_sst_info(
2102                3,
2103                vec![5],
2104                FullKey::for_test(
2105                    TableId::new(5),
2106                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2107                    0,
2108                )
2109                .encode()
2110                .into(),
2111                FullKey::for_test(
2112                    TableId::new(5),
2113                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2114                    0,
2115                )
2116                .encode()
2117                .into(),
2118            )],
2119            sub_level_id: 103,
2120            level_type: LevelType::Nonoverlapping,
2121            total_file_size: 100,
2122            ..Default::default()
2123        });
2124
2125        {
2126            // test empty
2127            let mut left_levels = Levels::default();
2128            let right_levels = Levels::default();
2129
2130            group_split::merge_levels(&mut left_levels, right_levels);
2131        }
2132
2133        {
2134            // test empty left
2135            let mut left_levels = build_initial_compaction_group_levels(
2136                1,
2137                &CompactionConfig {
2138                    max_level: 6,
2139                    ..Default::default()
2140                },
2141            );
2142            let right_levels = right_levels.clone();
2143
2144            group_split::merge_levels(&mut left_levels, right_levels);
2145
2146            assert!(left_levels.l0.sub_levels.len() == 3);
2147            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2148            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2149            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2150            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2151            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2152            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2153
2154            assert!(left_levels.levels[0].level_idx == 1);
2155            assert_eq!(300, left_levels.levels[0].total_file_size);
2156        }
2157
2158        {
2159            // test empty right
2160            let mut left_levels = left_levels.clone();
2161            let right_levels = build_initial_compaction_group_levels(
2162                2,
2163                &CompactionConfig {
2164                    max_level: 6,
2165                    ..Default::default()
2166                },
2167            );
2168
2169            group_split::merge_levels(&mut left_levels, right_levels);
2170
2171            assert!(left_levels.l0.sub_levels.len() == 3);
2172            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2173            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2174            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2175            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2176            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2177            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2178
2179            assert!(left_levels.levels[0].level_idx == 1);
2180            assert_eq!(300, left_levels.levels[0].total_file_size);
2181        }
2182
2183        {
2184            let mut left_levels = left_levels.clone();
2185            let right_levels = right_levels.clone();
2186
2187            group_split::merge_levels(&mut left_levels, right_levels);
2188
2189            assert!(left_levels.l0.sub_levels.len() == 6);
2190            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2191            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2192            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2193            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2194            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2195            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2196            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2197            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2198            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2199            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2200            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2201            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2202
2203            assert!(left_levels.levels[0].level_idx == 1);
2204            assert_eq!(600, left_levels.levels[0].total_file_size);
2205        }
2206    }
2207
2208    #[test]
2209    fn test_get_split_pos() {
2210        let epoch = test_epoch(1);
2211        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2212        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2213        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2214
2215        let ssts = vec![s1, s2, s3];
2216        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2217
2218        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2219        assert_eq!(1, pos);
2220
2221        let pos = group_split::get_split_pos(&vec![], split_key);
2222        assert_eq!(0, pos);
2223    }
2224
2225    #[test]
2226    fn test_split_sst() {
2227        let epoch = test_epoch(1);
2228        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2229
2230        {
2231            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2232            let origin_sst = sst.clone();
2233            let sst_size = origin_sst.sst_size;
2234            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2235            assert_eq!(SstSplitType::Both, split_type);
2236
2237            let mut new_sst_id = 10.into();
2238            let (origin_sst, branched_sst) = group_split::split_sst(
2239                origin_sst,
2240                &mut new_sst_id,
2241                split_key,
2242                sst_size / 2,
2243                sst_size / 2,
2244            );
2245
2246            let origin_sst = origin_sst.unwrap();
2247            let branched_sst = branched_sst.unwrap();
2248
2249            assert!(origin_sst.key_range.right_exclusive);
2250            assert!(
2251                origin_sst
2252                    .key_range
2253                    .right
2254                    .cmp(&branched_sst.key_range.left)
2255                    .is_le()
2256            );
2257            assert!(origin_sst.table_ids.is_sorted());
2258            assert!(branched_sst.table_ids.is_sorted());
2259            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2260            assert!(branched_sst.sst_size < origin_sst.file_size);
2261            assert_eq!(10, branched_sst.sst_id);
2262            assert_eq!(11, origin_sst.sst_id);
2263            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2264        }
2265
2266        {
2267            // test un-exist table_id
2268            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2269            let origin_sst = sst.clone();
2270            let sst_size = origin_sst.sst_size;
2271            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2272            assert_eq!(SstSplitType::Both, split_type);
2273
2274            let mut new_sst_id = 10.into();
2275            let (origin_sst, branched_sst) = group_split::split_sst(
2276                origin_sst,
2277                &mut new_sst_id,
2278                split_key,
2279                sst_size / 2,
2280                sst_size / 2,
2281            );
2282
2283            let origin_sst = origin_sst.unwrap();
2284            let branched_sst = branched_sst.unwrap();
2285
2286            assert!(origin_sst.key_range.right_exclusive);
2287            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2288            assert!(origin_sst.table_ids.is_sorted());
2289            assert!(branched_sst.table_ids.is_sorted());
2290            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2291            assert!(branched_sst.sst_size < origin_sst.file_size);
2292            assert_eq!(10, branched_sst.sst_id);
2293            assert_eq!(11, origin_sst.sst_id);
2294            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2295        }
2296
2297        {
2298            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2299            let split_type = group_split::need_to_split(&sst, split_key);
2300            assert_eq!(SstSplitType::Left, split_type);
2301        }
2302
2303        {
2304            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2305            let origin_sst = sst.clone();
2306            let split_type = group_split::need_to_split(&origin_sst, split_key);
2307            assert_eq!(SstSplitType::Both, split_type);
2308
2309            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2310            let origin_sst = sst;
2311            let split_type = group_split::need_to_split(&origin_sst, split_key);
2312            assert_eq!(SstSplitType::Right, split_type);
2313        }
2314
2315        {
2316            // test key_range left = right
2317            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2318            sst.key_range.right = sst.key_range.left.clone();
2319            let sst: SstableInfo = sst.into();
2320            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2321            let origin_sst = sst;
2322            let sst_size = origin_sst.sst_size;
2323
2324            let mut new_sst_id = 10.into();
2325            let (origin_sst, branched_sst) = group_split::split_sst(
2326                origin_sst,
2327                &mut new_sst_id,
2328                split_key,
2329                sst_size / 2,
2330                sst_size / 2,
2331            );
2332
2333            assert!(origin_sst.is_none());
2334            assert!(branched_sst.is_some());
2335        }
2336    }
2337
2338    #[test]
2339    fn test_split_sst_info_for_level() {
2340        let mut version = HummockVersion {
2341            id: HummockVersionId::new(0),
2342            levels: HashMap::from_iter([(
2343                1.into(),
2344                build_initial_compaction_group_levels(
2345                    1,
2346                    &CompactionConfig {
2347                        max_level: 6,
2348                        ..Default::default()
2349                    },
2350                ),
2351            )]),
2352            ..Default::default()
2353        };
2354
2355        let cg1 = version.levels.get_mut(&1).unwrap();
2356
2357        cg1.levels[0] = Level {
2358            level_idx: 1,
2359            level_type: LevelType::Nonoverlapping,
2360            table_infos: vec![
2361                gen_sst_info(
2362                    1,
2363                    vec![3],
2364                    FullKey::for_test(
2365                        TableId::new(3),
2366                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2367                        0,
2368                    )
2369                    .encode()
2370                    .into(),
2371                    FullKey::for_test(
2372                        TableId::new(3),
2373                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2374                        0,
2375                    )
2376                    .encode()
2377                    .into(),
2378                ),
2379                gen_sst_info(
2380                    10,
2381                    vec![3, 4],
2382                    FullKey::for_test(
2383                        TableId::new(3),
2384                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2385                        0,
2386                    )
2387                    .encode()
2388                    .into(),
2389                    FullKey::for_test(
2390                        TableId::new(4),
2391                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2392                        0,
2393                    )
2394                    .encode()
2395                    .into(),
2396                ),
2397                gen_sst_info(
2398                    11,
2399                    vec![4],
2400                    FullKey::for_test(
2401                        TableId::new(4),
2402                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2403                        0,
2404                    )
2405                    .encode()
2406                    .into(),
2407                    FullKey::for_test(
2408                        TableId::new(4),
2409                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2410                        0,
2411                    )
2412                    .encode()
2413                    .into(),
2414                ),
2415            ],
2416            total_file_size: 300,
2417            ..Default::default()
2418        };
2419
2420        cg1.l0.sub_levels.push(Level {
2421            level_idx: 0,
2422            table_infos: vec![
2423                gen_sst_info(
2424                    2,
2425                    vec![2],
2426                    FullKey::for_test(
2427                        TableId::new(0),
2428                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2429                        0,
2430                    )
2431                    .encode()
2432                    .into(),
2433                    FullKey::for_test(
2434                        TableId::new(2),
2435                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2436                        0,
2437                    )
2438                    .encode()
2439                    .into(),
2440                ),
2441                gen_sst_info(
2442                    22,
2443                    vec![2],
2444                    FullKey::for_test(
2445                        TableId::new(0),
2446                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2447                        0,
2448                    )
2449                    .encode()
2450                    .into(),
2451                    FullKey::for_test(
2452                        TableId::new(2),
2453                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2454                        0,
2455                    )
2456                    .encode()
2457                    .into(),
2458                ),
2459                gen_sst_info(
2460                    23,
2461                    vec![2],
2462                    FullKey::for_test(
2463                        TableId::new(0),
2464                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2465                        0,
2466                    )
2467                    .encode()
2468                    .into(),
2469                    FullKey::for_test(
2470                        TableId::new(2),
2471                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2472                        0,
2473                    )
2474                    .encode()
2475                    .into(),
2476                ),
2477                gen_sst_info(
2478                    24,
2479                    vec![2],
2480                    FullKey::for_test(
2481                        TableId::new(2),
2482                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2483                        0,
2484                    )
2485                    .encode()
2486                    .into(),
2487                    FullKey::for_test(
2488                        TableId::new(2),
2489                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2490                        0,
2491                    )
2492                    .encode()
2493                    .into(),
2494                ),
2495                gen_sst_info(
2496                    25,
2497                    vec![2],
2498                    FullKey::for_test(
2499                        TableId::new(0),
2500                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2501                        0,
2502                    )
2503                    .encode()
2504                    .into(),
2505                    FullKey::for_test(
2506                        TableId::new(0),
2507                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2508                        0,
2509                    )
2510                    .encode()
2511                    .into(),
2512                ),
2513            ],
2514            sub_level_id: 101,
2515            level_type: LevelType::Overlapping,
2516            total_file_size: 300,
2517            ..Default::default()
2518        });
2519
2520        {
2521            // split Overlapping level
2522            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2523
2524            let mut new_sst_id = 100.into();
2525            let x = group_split::split_sst_info_for_level_v2(
2526                &mut cg1.l0.sub_levels[0],
2527                &mut new_sst_id,
2528                split_key,
2529            );
2530            // assert_eq!(3, x.len());
2531            // assert_eq!(100, x[0].sst_id);
2532            // assert_eq!(100, x[0].sst_size);
2533            // assert_eq!(101, x[1].sst_id);
2534            // assert_eq!(100, x[1].sst_size);
2535            // assert_eq!(102, x[2].sst_id);
2536            // assert_eq!(100, x[2].sst_size);
2537
2538            let mut right_l0 = OverlappingLevel {
2539                sub_levels: vec![],
2540                total_file_size: 0,
2541                uncompressed_file_size: 0,
2542            };
2543
2544            right_l0.sub_levels.push(Level {
2545                level_idx: 0,
2546                table_infos: x,
2547                sub_level_id: 101,
2548                total_file_size: 100,
2549                level_type: LevelType::Overlapping,
2550                ..Default::default()
2551            });
2552
2553            let right_levels = Levels {
2554                levels: vec![],
2555                l0: right_l0,
2556                ..Default::default()
2557            };
2558
2559            merge_levels(cg1, right_levels);
2560        }
2561
2562        {
2563            // test split empty level
2564            let mut new_sst_id = 100.into();
2565            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2566            let x = group_split::split_sst_info_for_level_v2(
2567                &mut cg1.levels[2],
2568                &mut new_sst_id,
2569                split_key,
2570            );
2571
2572            assert!(x.is_empty());
2573        }
2574
2575        {
2576            // test split to right Nonoverlapping level
2577            let mut cg1 = cg1.clone();
2578            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2579
2580            let mut new_sst_id = 100.into();
2581            let x = group_split::split_sst_info_for_level_v2(
2582                &mut cg1.levels[0],
2583                &mut new_sst_id,
2584                split_key,
2585            );
2586
2587            assert_eq!(3, x.len());
2588            assert_eq!(1, x[0].sst_id);
2589            assert_eq!(100, x[0].sst_size);
2590            assert_eq!(10, x[1].sst_id);
2591            assert_eq!(100, x[1].sst_size);
2592            assert_eq!(11, x[2].sst_id);
2593            assert_eq!(100, x[2].sst_size);
2594
2595            assert_eq!(0, cg1.levels[0].table_infos.len());
2596        }
2597
2598        {
2599            // test split to left Nonoverlapping level
2600            let mut cg1 = cg1.clone();
2601            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2602
2603            let mut new_sst_id = 100.into();
2604            let x = group_split::split_sst_info_for_level_v2(
2605                &mut cg1.levels[0],
2606                &mut new_sst_id,
2607                split_key,
2608            );
2609
2610            assert_eq!(0, x.len());
2611            assert_eq!(3, cg1.levels[0].table_infos.len());
2612        }
2613
2614        // {
2615        //     // test split to both Nonoverlapping level
2616        //     let mut cg1 = cg1.clone();
2617        //     let split_key = build_split_key(3, VirtualNode::MAX);
2618
2619        //     let mut new_sst_id = 100;
2620        //     let x = group_split::split_sst_info_for_level_v2(
2621        //         &mut cg1.levels[0],
2622        //         &mut new_sst_id,
2623        //         split_key,
2624        //     );
2625
2626        //     assert_eq!(2, x.len());
2627        //     assert_eq!(100, x[0].sst_id);
2628        //     assert_eq!(100 / 2, x[0].sst_size);
2629        //     assert_eq!(11, x[1].sst_id);
2630        //     assert_eq!(100, x[1].sst_size);
2631        //     assert_eq!(vec![3, 4], x[0].table_ids);
2632
2633        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2634        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2635        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2636        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2637        // }
2638
2639        {
2640            // test split to both Nonoverlapping level
2641            let mut cg1 = cg1.clone();
2642            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2643
2644            let mut new_sst_id = 100.into();
2645            let x = group_split::split_sst_info_for_level_v2(
2646                &mut cg1.levels[0],
2647                &mut new_sst_id,
2648                split_key,
2649            );
2650
2651            assert_eq!(2, x.len());
2652            assert_eq!(100, x[0].sst_id);
2653            assert_eq!(100 / 2, x[0].sst_size);
2654            assert_eq!(11, x[1].sst_id);
2655            assert_eq!(100, x[1].sst_size);
2656            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2657
2658            assert_eq!(2, cg1.levels[0].table_infos.len());
2659            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2660            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2661            assert_eq!(
2662                vec![TableId::new(3)],
2663                cg1.levels[0].table_infos[1].table_ids
2664            );
2665        }
2666    }
2667
2668    fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2669        SstableInfoInner {
2670            sst_id: sst_id.into(),
2671            object_id: sst_id.into(),
2672            table_ids: table_ids.into_iter().map(TableId::new).collect(),
2673            file_size: sst_size,
2674            sst_size,
2675            uncompressed_file_size: sst_size * 2,
2676            ..Default::default()
2677        }
2678        .into()
2679    }
2680
2681    #[test]
2682    fn test_level_normalize() {
2683        // Mixed: some SSTs empty, some not.
2684        let mut level = Level {
2685            level_idx: 1,
2686            level_type: LevelType::Nonoverlapping,
2687            table_infos: vec![
2688                make_sst(1, vec![1, 2], 100),
2689                make_sst(2, vec![], 200), // empty → removed
2690                make_sst(3, vec![3], 300),
2691                make_sst(4, vec![], 400), // empty → removed
2692            ],
2693            total_file_size: 9999, // intentionally wrong
2694            uncompressed_file_size: 9999,
2695            ..Default::default()
2696        };
2697
2698        level.normalize();
2699
2700        assert_eq!(level.table_infos.len(), 2);
2701        assert_eq!(1, level.table_infos[0].sst_id);
2702        assert_eq!(3, level.table_infos[1].sst_id);
2703        assert_eq!(level.total_file_size, 400);
2704        assert_eq!(level.uncompressed_file_size, 800);
2705
2706        // No empty SSTs: just recompute sizes.
2707        level.total_file_size = 0;
2708        level.uncompressed_file_size = 0;
2709        level.normalize();
2710        assert_eq!(level.table_infos.len(), 2);
2711        assert_eq!(level.total_file_size, 400);
2712        assert_eq!(level.uncompressed_file_size, 800);
2713
2714        // All empty → level becomes empty.
2715        level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2716        level.normalize();
2717        assert!(level.table_infos.is_empty());
2718        assert_eq!(level.total_file_size, 0);
2719        assert_eq!(level.uncompressed_file_size, 0);
2720    }
2721
2722    #[test]
2723    fn test_level_delete_ssts() {
2724        let mut level = Level {
2725            level_idx: 1,
2726            level_type: LevelType::Nonoverlapping,
2727            table_infos: vec![
2728                make_sst(1, vec![1], 100),
2729                make_sst(2, vec![2], 200),
2730                make_sst(3, vec![3], 300),
2731            ],
2732            total_file_size: 600,
2733            uncompressed_file_size: 1200,
2734            ..Default::default()
2735        };
2736
2737        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2738        let changed = level.delete_ssts(&delete_ids);
2739
2740        assert!(changed);
2741        assert_eq!(level.table_infos.len(), 2);
2742        assert_eq!(1, level.table_infos[0].sst_id);
2743        assert_eq!(3, level.table_infos[1].sst_id);
2744        assert_eq!(level.total_file_size, 400);
2745        assert_eq!(level.uncompressed_file_size, 800);
2746
2747        // Delete non-existent id → no change, returns false.
2748        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2749        let changed = level.delete_ssts(&delete_ids);
2750        assert!(!changed);
2751        assert_eq!(level.table_infos.len(), 2);
2752    }
2753
2754    #[test]
2755    fn test_level_truncate_tables() {
2756        let mut level = Level {
2757            level_idx: 1,
2758            level_type: LevelType::Nonoverlapping,
2759            table_infos: vec![
2760                make_sst(1, vec![1, 2], 100),
2761                make_sst(2, vec![2, 3], 200),
2762                make_sst(3, vec![2], 300),
2763            ],
2764            total_file_size: 600,
2765            uncompressed_file_size: 1200,
2766            ..Default::default()
2767        };
2768
2769        let truncate_ids = HashSet::from([TableId::new(2)]);
2770        level.truncate_tables(&truncate_ids);
2771
2772        // SST 3 should be removed (was only table_id=2)
2773        assert_eq!(level.table_infos.len(), 2);
2774        assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2775        assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2776        assert_eq!(level.total_file_size, 100 + 200);
2777        assert_eq!(level.uncompressed_file_size, 200 + 400);
2778
2779        // Truncate remaining tables → all SSTs removed.
2780        let truncate_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2781        level.truncate_tables(&truncate_ids);
2782        assert!(level.table_infos.is_empty());
2783        assert_eq!(level.total_file_size, 0);
2784        assert_eq!(level.uncompressed_file_size, 0);
2785    }
2786
2787    #[test]
2788    fn test_overlapping_level_normalize() {
2789        let mut l0 = OverlappingLevel {
2790            sub_levels: vec![
2791                Level {
2792                    level_idx: 0,
2793                    table_infos: vec![make_sst(1, vec![1], 100)],
2794                    total_file_size: 100,
2795                    uncompressed_file_size: 200,
2796                    sub_level_id: 1,
2797                    ..Default::default()
2798                },
2799                Level {
2800                    level_idx: 0,
2801                    table_infos: vec![], // empty → should be removed
2802                    total_file_size: 0,
2803                    uncompressed_file_size: 0,
2804                    sub_level_id: 2,
2805                    ..Default::default()
2806                },
2807                Level {
2808                    level_idx: 0,
2809                    table_infos: vec![make_sst(3, vec![3], 300)],
2810                    total_file_size: 300,
2811                    uncompressed_file_size: 600,
2812                    sub_level_id: 3,
2813                    ..Default::default()
2814                },
2815            ],
2816            total_file_size: 9999, // intentionally wrong
2817            uncompressed_file_size: 9999,
2818        };
2819
2820        l0.normalize();
2821
2822        assert_eq!(l0.sub_levels.len(), 2);
2823        assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2824        assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2825        assert_eq!(l0.total_file_size, 100 + 300);
2826        assert_eq!(l0.uncompressed_file_size, 200 + 600);
2827
2828        // All sub-levels empty → normalize clears everything.
2829        l0.sub_levels = vec![Level {
2830            level_idx: 0,
2831            table_infos: vec![],
2832            ..Default::default()
2833        }];
2834        l0.normalize();
2835        assert!(l0.sub_levels.is_empty());
2836        assert_eq!(l0.total_file_size, 0);
2837        assert_eq!(l0.uncompressed_file_size, 0);
2838    }
2839
2840    #[test]
2841    fn test_levels_truncate_tables() {
2842        #[expect(deprecated)]
2843        let mut levels = Levels {
2844            l0: OverlappingLevel {
2845                sub_levels: vec![
2846                    Level {
2847                        level_idx: 0,
2848                        table_infos: vec![
2849                            make_sst(1, vec![10], 100), // table 10
2850                            make_sst(2, vec![20], 200), // table 20
2851                        ],
2852                        total_file_size: 300,
2853                        uncompressed_file_size: 600,
2854                        sub_level_id: 1,
2855                        ..Default::default()
2856                    },
2857                    Level {
2858                        level_idx: 0,
2859                        table_infos: vec![
2860                            make_sst(3, vec![10], 150), // table 10
2861                        ],
2862                        total_file_size: 150,
2863                        uncompressed_file_size: 300,
2864                        sub_level_id: 2,
2865                        ..Default::default()
2866                    },
2867                ],
2868                total_file_size: 450,
2869                uncompressed_file_size: 900,
2870            },
2871            levels: vec![Level {
2872                level_idx: 1,
2873                level_type: LevelType::Nonoverlapping,
2874                table_infos: vec![
2875                    make_sst(4, vec![10, 20], 400), // shared SST
2876                    make_sst(5, vec![10], 500),     // table 10 only
2877                ],
2878                total_file_size: 900,
2879                uncompressed_file_size: 1800,
2880                ..Default::default()
2881            }],
2882            group_id: 1.into(),
2883            parent_group_id: 0.into(),
2884            member_table_ids: vec![],
2885            compaction_group_version_id: 0,
2886        };
2887
2888        // Truncate table 10
2889        levels.truncate_tables(&HashSet::from([TableId::new(10)]));
2890
2891        assert_eq!(levels.l0.sub_levels.len(), 1);
2892        assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2893        assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2894        assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2895        assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2896        assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2897
2898        assert_eq!(levels.l0.total_file_size, 200);
2899        assert_eq!(levels.l0.uncompressed_file_size, 400);
2900
2901        assert_eq!(levels.levels[0].table_infos.len(), 1);
2902        assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2903        assert_eq!(
2904            levels.levels[0].table_infos[0].table_ids,
2905            vec![TableId::new(20)]
2906        );
2907        assert_eq!(levels.levels[0].total_file_size, 400);
2908        assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2909
2910        assert_eq!(levels.compaction_group_version_id, 1);
2911    }
2912
2913    #[test]
2914    fn test_apply_version_delta_truncate_tables() {
2915        let mut version = HummockVersion {
2916            id: HummockVersionId::new(0),
2917            levels: HashMap::from_iter([(1.into(), {
2918                #[expect(deprecated)]
2919                let levels = Levels {
2920                    l0: OverlappingLevel {
2921                        sub_levels: vec![
2922                            Level {
2923                                level_idx: 0,
2924                                level_type: LevelType::Overlapping,
2925                                table_infos: vec![
2926                                    make_sst(1, vec![100], 50), // only table 100
2927                                    make_sst(2, vec![200], 60), // only table 200
2928                                ],
2929                                total_file_size: 110,
2930                                uncompressed_file_size: 220,
2931                                sub_level_id: 1,
2932                                ..Default::default()
2933                            },
2934                            Level {
2935                                level_idx: 0,
2936                                level_type: LevelType::Overlapping,
2937                                table_infos: vec![
2938                                    make_sst(3, vec![100], 70), // only table 100
2939                                ],
2940                                total_file_size: 70,
2941                                uncompressed_file_size: 140,
2942                                sub_level_id: 2,
2943                                ..Default::default()
2944                            },
2945                        ],
2946                        total_file_size: 180,
2947                        uncompressed_file_size: 360,
2948                    },
2949                    levels: vec![Level {
2950                        level_idx: 1,
2951                        level_type: LevelType::Nonoverlapping,
2952                        table_infos: vec![
2953                            make_sst(4, vec![100, 200], 80), // shared
2954                            make_sst(5, vec![100], 90),      // only table 100
2955                        ],
2956                        total_file_size: 170,
2957                        uncompressed_file_size: 340,
2958                        ..Default::default()
2959                    }],
2960                    group_id: 1.into(),
2961                    parent_group_id: 0.into(),
2962                    member_table_ids: vec![],
2963                    compaction_group_version_id: 0,
2964                };
2965                levels
2966            })]),
2967            ..Default::default()
2968        };
2969
2970        let version_delta = HummockVersionDelta {
2971            id: HummockVersionId::new(1),
2972            group_deltas: HashMap::from_iter([(
2973                1.into(),
2974                GroupDeltas {
2975                    group_deltas: vec![GroupDelta::TruncateTables(HashSet::from([TableId::new(
2976                        100,
2977                    )]))],
2978                },
2979            )]),
2980            ..Default::default()
2981        };
2982
2983        version.apply_version_delta(&version_delta);
2984
2985        let cg = version.get_compaction_group_levels(1.into());
2986
2987        assert_eq!(
2988            cg.l0.sub_levels.len(),
2989            1,
2990            "empty sub-level should be removed"
2991        );
2992        assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
2993        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
2994        assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
2995        for sub_level in &cg.l0.sub_levels {
2996            for sst in &sub_level.table_infos {
2997                assert!(
2998                    !sst.table_ids.is_empty(),
2999                    "SST {} should not have empty table_ids",
3000                    sst.sst_id
3001                );
3002            }
3003        }
3004
3005        assert_eq!(cg.l0.total_file_size, 60);
3006        assert_eq!(cg.l0.uncompressed_file_size, 120);
3007
3008        assert_eq!(cg.levels[0].table_infos.len(), 1);
3009        assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3010        assert_eq!(
3011            cg.levels[0].table_infos[0].table_ids,
3012            vec![TableId::new(200)]
3013        );
3014        assert_eq!(cg.levels[0].total_file_size, 80);
3015        assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3016
3017        assert_eq!(cg.compaction_group_version_id, 1);
3018    }
3019
3020    #[test]
3021    fn test_apply_version_delta_compact_l0() {
3022        let mut version = HummockVersion {
3023            id: HummockVersionId::new(0),
3024            levels: HashMap::from_iter([(1.into(), {
3025                #[expect(deprecated)]
3026                let levels = Levels {
3027                    l0: OverlappingLevel {
3028                        sub_levels: vec![
3029                            Level {
3030                                level_idx: 0,
3031                                level_type: LevelType::Nonoverlapping,
3032                                table_infos: vec![
3033                                    make_sst(1, vec![1], 100),
3034                                    make_sst(2, vec![2], 200),
3035                                ],
3036                                total_file_size: 300,
3037                                uncompressed_file_size: 600,
3038                                sub_level_id: 1,
3039                                ..Default::default()
3040                            },
3041                            Level {
3042                                level_idx: 0,
3043                                level_type: LevelType::Nonoverlapping,
3044                                table_infos: vec![make_sst(3, vec![3], 300)],
3045                                total_file_size: 300,
3046                                uncompressed_file_size: 600,
3047                                sub_level_id: 2,
3048                                ..Default::default()
3049                            },
3050                        ],
3051                        total_file_size: 600,
3052                        uncompressed_file_size: 1200,
3053                    },
3054                    levels: vec![Level {
3055                        level_idx: 1,
3056                        level_type: LevelType::Nonoverlapping,
3057                        table_infos: vec![],
3058                        total_file_size: 0,
3059                        uncompressed_file_size: 0,
3060                        ..Default::default()
3061                    }],
3062                    group_id: 1.into(),
3063                    parent_group_id: 0.into(),
3064                    member_table_ids: vec![],
3065                    compaction_group_version_id: 0,
3066                };
3067                levels
3068            })]),
3069            ..Default::default()
3070        };
3071
3072        let version_delta = HummockVersionDelta {
3073            id: HummockVersionId::new(1),
3074            group_deltas: HashMap::from_iter([(
3075                1.into(),
3076                GroupDeltas {
3077                    group_deltas: vec![
3078                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3079                            0, // L0
3080                            0,
3081                            HashSet::from([1.into(), 2.into(), 3.into()]),
3082                            vec![],
3083                            0,
3084                            0,
3085                        )),
3086                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3087                            1, // L1
3088                            0,
3089                            HashSet::new(),
3090                            vec![make_sst(10, vec![1, 2, 3], 500)],
3091                            0,
3092                            0,
3093                        )),
3094                    ],
3095                },
3096            )]),
3097            ..Default::default()
3098        };
3099
3100        version.apply_version_delta(&version_delta);
3101
3102        let cg = version.get_compaction_group_levels(1.into());
3103
3104        assert!(cg.l0.sub_levels.is_empty());
3105        assert_eq!(cg.l0.total_file_size, 0);
3106        assert_eq!(cg.l0.uncompressed_file_size, 0);
3107
3108        assert_eq!(cg.levels[0].table_infos.len(), 1);
3109        assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3110        assert_eq!(cg.levels[0].total_file_size, 500);
3111    }
3112}