risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52    pub insert_sst_level: u32,
53    pub insert_sst_infos: Vec<SstableInfo>,
54    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61        self.levels
62            .get(&compaction_group_id)
63            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64    }
65
66    pub fn get_compaction_group_levels_mut(
67        &mut self,
68        compaction_group_id: CompactionGroupId,
69    ) -> &mut Levels {
70        self.levels
71            .get_mut(&compaction_group_id)
72            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73    }
74
75    // only scan the sst infos from levels in the specified compaction group (without table change log)
76    pub fn get_sst_ids_by_group_id(
77        &self,
78        compaction_group_id: CompactionGroupId,
79    ) -> impl Iterator<Item = HummockSstableId> + '_ {
80        self.levels
81            .iter()
82            .filter_map(move |(cg_id, level)| {
83                if *cg_id == compaction_group_id {
84                    Some(level)
85                } else {
86                    None
87                }
88            })
89            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90            .flat_map(|level| level.table_infos.iter())
91            .map(|s| s.sst_id)
92    }
93
94    pub fn level_iter<F: FnMut(&Level) -> bool>(
95        &self,
96        compaction_group_id: CompactionGroupId,
97        mut f: F,
98    ) {
99        if let Some(levels) = self.levels.get(&compaction_group_id) {
100            for sub_level in &levels.l0.sub_levels {
101                if !f(sub_level) {
102                    return;
103                }
104            }
105            for level in &levels.levels {
106                if !f(level) {
107                    return;
108                }
109            }
110        }
111    }
112
113    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114        // l0 is currently separated from all levels
115        self.levels
116            .get(&compaction_group_id)
117            .map(|group| group.levels.len() + 1)
118            .unwrap_or(0)
119    }
120
121    pub fn safe_epoch_table_watermarks(
122        &self,
123        existing_table_ids: &[TableId],
124    ) -> BTreeMap<TableId, TableWatermarks> {
125        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126    }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131    existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133    fn extract_single_table_watermark(
134        table_watermarks: &TableWatermarks,
135    ) -> Option<TableWatermarks> {
136        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137            Some(TableWatermarks {
138                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139                direction: table_watermarks.direction,
140                watermark_type: table_watermarks.watermark_type,
141            })
142        } else {
143            None
144        }
145    }
146    table_watermarks
147        .iter()
148        .filter_map(|(table_id, table_watermarks)| {
149            if !existing_table_ids.contains(table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (*table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                table_id,
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level.normalize();
296                if insert_table_infos.is_empty() {
297                    continue;
298                }
299                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
300                    Ok(idx) => {
301                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
302                    }
303                    Err(idx) => {
304                        insert_new_sub_level(
305                            target_l0,
306                            sub_level.sub_level_id,
307                            sub_level.level_type,
308                            insert_table_infos,
309                            Some(idx),
310                        );
311                    }
312                }
313            }
314            l0.normalize();
315        }
316        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
317            let insert_table_infos =
318                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
319            cur_levels.levels[idx].total_file_size += insert_table_infos
320                .iter()
321                .map(|sst| sst.sst_size)
322                .sum::<u64>();
323            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
324                .iter()
325                .map(|sst| sst.uncompressed_file_size)
326                .sum::<u64>();
327            cur_levels.levels[idx]
328                .table_infos
329                .extend(insert_table_infos);
330            cur_levels.levels[idx]
331                .table_infos
332                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
333            assert!(can_concat(&cur_levels.levels[idx].table_infos));
334            level.normalize();
335        }
336
337        assert!(
338            parent_levels
339                .l0
340                .sub_levels
341                .iter()
342                .all(|level| !level.table_infos.is_empty())
343        );
344        assert!(
345            cur_levels
346                .l0
347                .sub_levels
348                .iter()
349                .all(|level| !level.table_infos.is_empty())
350        );
351    }
352
353    pub fn build_sst_delta_infos(
354        &self,
355        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
356    ) -> Vec<SstDeltaInfo> {
357        let mut infos = vec![];
358
359        // Skip trivial move delta for refiller
360        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
361        if version_delta.trivial_move {
362            return infos;
363        }
364
365        for (group_id, group_deltas) in &version_delta.group_deltas {
366            let mut info = SstDeltaInfo::default();
367
368            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
369            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
370
371            // Build only if all deltas are intra level deltas.
372            if !group_deltas.group_deltas.iter().all(|delta| {
373                matches!(
374                    delta,
375                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
376                )
377            }) {
378                continue;
379            }
380
381            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
382            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
383            // struct to reduce conventions.
384            for group_delta in &group_deltas.group_deltas {
385                match group_delta {
386                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
387                        if !inserted_table_infos.is_empty() {
388                            info.insert_sst_level = 0;
389                            info.insert_sst_infos
390                                .extend(inserted_table_infos.iter().cloned());
391                        }
392                    }
393                    GroupDeltaCommon::IntraLevel(intra_level) => {
394                        if !intra_level.inserted_table_infos.is_empty() {
395                            info.insert_sst_level = intra_level.level_idx;
396                            info.insert_sst_infos
397                                .extend(intra_level.inserted_table_infos.iter().cloned());
398                        }
399                        if !intra_level.removed_table_ids.is_empty() {
400                            for id in &intra_level.removed_table_ids {
401                                if intra_level.level_idx == 0 {
402                                    removed_l0_ssts.insert(*id);
403                                } else {
404                                    removed_ssts
405                                        .entry(intra_level.level_idx)
406                                        .or_default()
407                                        .insert(*id);
408                                }
409                            }
410                        }
411                    }
412                    GroupDeltaCommon::GroupConstruct(_)
413                    | GroupDeltaCommon::GroupDestroy(_)
414                    | GroupDeltaCommon::GroupMerge(_)
415                    | GroupDeltaCommon::TruncateTables(_) => {}
416                }
417            }
418
419            let group = self.levels.get(group_id).unwrap();
420            for l0_sub_level in &group.level0().sub_levels {
421                for sst_info in &l0_sub_level.table_infos {
422                    if removed_l0_ssts.remove(&sst_info.sst_id) {
423                        info.delete_sst_object_ids.push(sst_info.object_id);
424                    }
425                }
426            }
427            for level in &group.levels {
428                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
429                    for sst_info in &level.table_infos {
430                        if removed_level_ssts.remove(&sst_info.sst_id) {
431                            info.delete_sst_object_ids.push(sst_info.object_id);
432                        }
433                    }
434                    if !removed_level_ssts.is_empty() {
435                        tracing::error!(
436                            "removed_level_ssts is not empty: {:?}",
437                            removed_level_ssts,
438                        );
439                    }
440                    debug_assert!(removed_level_ssts.is_empty());
441                }
442            }
443
444            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
445                tracing::error!(
446                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
447                    removed_l0_ssts,
448                    removed_ssts
449                );
450            }
451            debug_assert!(removed_l0_ssts.is_empty());
452            debug_assert!(removed_ssts.is_empty());
453
454            infos.push(info);
455        }
456
457        infos
458    }
459
460    pub fn apply_version_delta(
461        &mut self,
462        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
463    ) {
464        assert_eq!(self.id, version_delta.prev_id);
465
466        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
467            &version_delta.state_table_info_delta,
468            &version_delta.removed_table_ids,
469        );
470
471        #[expect(deprecated)]
472        {
473            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
474                is_commit_epoch = true;
475                tracing::trace!(
476                    "max committed epoch bumped but no table committed epoch is changed"
477                );
478            }
479        }
480
481        // apply to `levels`, which is different compaction groups
482        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
483            let mut is_applied_l0_compact = false;
484            for group_delta in &group_deltas.group_deltas {
485                match group_delta {
486                    GroupDeltaCommon::GroupConstruct(group_construct) => {
487                        let mut new_levels = build_initial_compaction_group_levels(
488                            *compaction_group_id,
489                            group_construct.get_group_config().unwrap(),
490                        );
491                        let parent_group_id = group_construct.parent_group_id;
492                        new_levels.parent_group_id = parent_group_id;
493                        #[expect(deprecated)]
494                        // for backward-compatibility of previous hummock version delta
495                        new_levels
496                            .member_table_ids
497                            .clone_from(&group_construct.table_ids);
498                        self.levels.insert(*compaction_group_id, new_levels);
499                        let member_table_ids = if group_construct.version()
500                            >= CompatibilityVersion::NoMemberTableIds
501                        {
502                            self.state_table_info
503                                .compaction_group_member_table_ids(*compaction_group_id)
504                                .iter()
505                                .copied()
506                                .collect()
507                        } else {
508                            #[expect(deprecated)]
509                            // for backward-compatibility of previous hummock version delta
510                            BTreeSet::from_iter(
511                                group_construct.table_ids.iter().copied().map(Into::into),
512                            )
513                        };
514
515                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
516                            let split_key = if group_construct.split_key.is_some() {
517                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
518                            } else {
519                                None
520                            };
521                            self.init_with_parent_group_v2(
522                                parent_group_id,
523                                *compaction_group_id,
524                                group_construct.new_sst_start_id,
525                                split_key.clone(),
526                            );
527                        } else {
528                            // for backward-compatibility of previous hummock version delta
529                            self.init_with_parent_group(
530                                parent_group_id,
531                                *compaction_group_id,
532                                member_table_ids,
533                                group_construct.new_sst_start_id,
534                            );
535                        }
536                    }
537                    GroupDeltaCommon::GroupMerge(group_merge) => {
538                        tracing::info!(
539                            "group_merge left {:?} right {:?}",
540                            group_merge.left_group_id,
541                            group_merge.right_group_id
542                        );
543                        self.merge_compaction_group(
544                            group_merge.left_group_id,
545                            group_merge.right_group_id,
546                        )
547                    }
548                    GroupDeltaCommon::IntraLevel(level_delta) => {
549                        let levels =
550                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
551                                panic!("compaction group {} does not exist", compaction_group_id)
552                            });
553                        if is_commit_epoch {
554                            assert!(
555                                level_delta.removed_table_ids.is_empty(),
556                                "no sst should be deleted when committing an epoch"
557                            );
558
559                            let IntraLevelDelta {
560                                level_idx,
561                                l0_sub_level_id,
562                                inserted_table_infos,
563                                ..
564                            } = level_delta;
565                            {
566                                assert_eq!(
567                                    *level_idx, 0,
568                                    "we should only add to L0 when we commit an epoch."
569                                );
570                                if !inserted_table_infos.is_empty() {
571                                    insert_new_sub_level(
572                                        &mut levels.l0,
573                                        *l0_sub_level_id,
574                                        PbLevelType::Overlapping,
575                                        inserted_table_infos.clone(),
576                                        None,
577                                    );
578                                }
579                            }
580                        } else {
581                            // The delta is caused by compaction.
582                            levels.apply_compact_ssts(
583                                level_delta,
584                                self.state_table_info
585                                    .compaction_group_member_table_ids(*compaction_group_id),
586                            );
587                            if level_delta.level_idx == 0 {
588                                is_applied_l0_compact = true;
589                            }
590                        }
591                    }
592                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
593                        let levels =
594                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
595                                panic!("compaction group {} does not exist", compaction_group_id)
596                            });
597                        assert!(is_commit_epoch);
598
599                        if !inserted_table_infos.is_empty() {
600                            let next_l0_sub_level_id = levels
601                                .l0
602                                .sub_levels
603                                .last()
604                                .map(|level| level.sub_level_id + 1)
605                                .unwrap_or(1);
606
607                            insert_new_sub_level(
608                                &mut levels.l0,
609                                next_l0_sub_level_id,
610                                PbLevelType::Overlapping,
611                                inserted_table_infos.clone(),
612                                None,
613                            );
614                        }
615                    }
616                    GroupDeltaCommon::GroupDestroy(_) => {
617                        self.levels.remove(compaction_group_id);
618                    }
619
620                    GroupDeltaCommon::TruncateTables(table_ids) => {
621                        self.levels
622                            .get_mut(compaction_group_id)
623                            .unwrap_or_else(|| {
624                                panic!("compaction group {} does not exist", compaction_group_id)
625                            })
626                            .truncate_tables(table_ids);
627                    }
628                }
629            }
630
631            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
632            {
633                levels.l0.normalize();
634            }
635        }
636        self.id = version_delta.id;
637        #[expect(deprecated)]
638        {
639            self.max_committed_epoch = version_delta.max_committed_epoch;
640        }
641
642        // apply to table watermark
643
644        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
645        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
646            HashMap::new();
647
648        // apply to table watermark
649        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
650            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
651                if version_delta.removed_table_ids.contains(table_id) {
652                    modified_table_watermarks.insert(*table_id, None);
653                } else {
654                    let mut current_table_watermarks = (**current_table_watermarks).clone();
655                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
656                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
657                }
658            } else {
659                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
660            }
661        }
662        for (table_id, table_watermarks) in &self.table_watermarks {
663            let safe_epoch = if let Some(state_table_info) =
664                self.state_table_info.info().get(table_id)
665                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
666                && state_table_info.committed_epoch > *oldest_epoch
667            {
668                // safe epoch has progressed, need further clear.
669                state_table_info.committed_epoch
670            } else {
671                // safe epoch not progressed or the table has been removed. No need to truncate
672                continue;
673            };
674            let table_watermarks = modified_table_watermarks
675                .entry(*table_id)
676                .or_insert_with(|| Some((**table_watermarks).clone()));
677            if let Some(table_watermarks) = table_watermarks {
678                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
679            }
680        }
681        // apply the staging table watermark to hummock version
682        for (table_id, table_watermarks) in modified_table_watermarks {
683            if let Some(table_watermarks) = table_watermarks {
684                self.table_watermarks
685                    .insert(table_id, Arc::new(table_watermarks));
686            } else {
687                self.table_watermarks.remove(&table_id);
688            }
689        }
690
691        // apply to table change log
692        Self::apply_change_log_delta(
693            &mut self.table_change_log,
694            &version_delta.change_log_delta,
695            &version_delta.removed_table_ids,
696            &version_delta.state_table_info_delta,
697            &changed_table_info,
698        );
699
700        // apply to vector index
701        apply_vector_index_delta(
702            &mut self.vector_indexes,
703            &version_delta.vector_index_delta,
704            &version_delta.removed_table_ids,
705        );
706    }
707
708    pub fn apply_change_log_delta<T: Clone>(
709        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
710        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
711        removed_table_ids: &HashSet<TableId>,
712        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
713        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
714    ) {
715        for (table_id, change_log_delta) in change_log_delta {
716            let new_change_log = &change_log_delta.new_log;
717            match table_change_log.entry(*table_id) {
718                Entry::Occupied(entry) => {
719                    let change_log = entry.into_mut();
720                    change_log.add_change_log(new_change_log.clone());
721                }
722                Entry::Vacant(entry) => {
723                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
724                }
725            };
726        }
727
728        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
729        // the change log for the table, and then we will remove the table change log.
730        // The table change log will also be removed when the table id is removed.
731        table_change_log.retain(|table_id, _| {
732            if removed_table_ids.contains(table_id) {
733                return false;
734            }
735            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
736                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
737                // the table exists previously, and its committed epoch has progressed.
738            } else {
739                // otherwise, the table change log should be kept anyway
740                return true;
741            }
742            let contains = change_log_delta.contains_key(table_id);
743            if !contains {
744                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
745                    LazyLock::new(|| LogSuppressor::per_second(1));
746                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
747                    warn!(
748                        suppressed_count,
749                        %table_id,
750                        "table change log dropped due to no further change log at newly committed epoch"
751                    );
752                }
753            }
754            contains
755        });
756
757        // truncate the remaining table change log
758        for (table_id, change_log_delta) in change_log_delta {
759            if let Some(change_log) = table_change_log.get_mut(table_id) {
760                change_log.truncate(change_log_delta.truncate_epoch);
761            }
762        }
763    }
764
765    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
766        let mut ret: BTreeMap<_, _> = BTreeMap::new();
767        for (compaction_group_id, group) in &self.levels {
768            let mut levels = vec![];
769            levels.extend(group.l0.sub_levels.iter());
770            levels.extend(group.levels.iter());
771            for level in levels {
772                for table_info in &level.table_infos {
773                    if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
774                        continue;
775                    }
776                    let object_id = table_info.object_id;
777                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
778                    entry
779                        .entry(*compaction_group_id)
780                        .or_default()
781                        .push(table_info.sst_id)
782                }
783            }
784        }
785        ret
786    }
787
788    pub fn merge_compaction_group(
789        &mut self,
790        left_group_id: CompactionGroupId,
791        right_group_id: CompactionGroupId,
792    ) {
793        // Double check
794        let left_group_id_table_ids = self
795            .state_table_info
796            .compaction_group_member_table_ids(left_group_id)
797            .iter();
798        let right_group_id_table_ids = self
799            .state_table_info
800            .compaction_group_member_table_ids(right_group_id)
801            .iter();
802
803        assert!(
804            left_group_id_table_ids
805                .chain(right_group_id_table_ids)
806                .is_sorted()
807        );
808
809        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
810        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
811            panic!(
812                "compaction group should exist right {} all {:?}",
813                right_group_id, total_cg
814            )
815        });
816
817        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
818            panic!(
819                "compaction group should exist left {} all {:?}",
820                left_group_id, total_cg
821            )
822        });
823
824        group_split::merge_levels(left_levels, right_levels);
825    }
826
827    pub fn init_with_parent_group_v2(
828        &mut self,
829        parent_group_id: CompactionGroupId,
830        group_id: CompactionGroupId,
831        new_sst_start_id: HummockSstableId,
832        split_key: Option<Bytes>,
833    ) {
834        let mut new_sst_id = new_sst_start_id;
835        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
836            if new_sst_start_id != 0 {
837                if cfg!(debug_assertions) {
838                    panic!(
839                        "non-zero sst start id {} for NewCompactionGroup",
840                        new_sst_start_id
841                    );
842                } else {
843                    warn!(
844                        %new_sst_start_id,
845                        "non-zero sst start id for NewCompactionGroup"
846                    );
847                }
848            }
849            return;
850        } else if !self.levels.contains_key(&parent_group_id) {
851            unreachable!(
852                "non-existing parent group id {} to init from (V2)",
853                parent_group_id
854            );
855        }
856
857        let [parent_levels, cur_levels] = self
858            .levels
859            .get_disjoint_mut([&parent_group_id, &group_id])
860            .map(|res| res.unwrap());
861        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
862        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
863        parent_levels.compaction_group_version_id += 1;
864        cur_levels.compaction_group_version_id += 1;
865
866        let l0 = &mut parent_levels.l0;
867        {
868            for sub_level in &mut l0.sub_levels {
869                let target_l0 = &mut cur_levels.l0;
870                // Remove SST from sub level may result in empty sub level. It will be purged
871                // whenever another compaction task is finished.
872                let insert_table_infos = if let Some(split_key) = &split_key {
873                    group_split::split_sst_info_for_level_v2(
874                        sub_level,
875                        &mut new_sst_id,
876                        split_key.clone(),
877                    )
878                } else {
879                    vec![]
880                };
881
882                if insert_table_infos.is_empty() {
883                    continue;
884                }
885
886                sub_level.normalize();
887                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
888                    Ok(idx) => {
889                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
890                    }
891                    Err(idx) => {
892                        insert_new_sub_level(
893                            target_l0,
894                            sub_level.sub_level_id,
895                            sub_level.level_type,
896                            insert_table_infos,
897                            Some(idx),
898                        );
899                    }
900                }
901            }
902            l0.normalize();
903        }
904
905        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
906            let insert_table_infos = if let Some(split_key) = &split_key {
907                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
908            } else {
909                vec![]
910            };
911
912            if insert_table_infos.is_empty() {
913                continue;
914            }
915
916            cur_levels.levels[idx].total_file_size += insert_table_infos
917                .iter()
918                .map(|sst| sst.sst_size)
919                .sum::<u64>();
920            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
921                .iter()
922                .map(|sst| sst.uncompressed_file_size)
923                .sum::<u64>();
924            cur_levels.levels[idx]
925                .table_infos
926                .extend(insert_table_infos);
927            cur_levels.levels[idx]
928                .table_infos
929                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
930            assert!(can_concat(&cur_levels.levels[idx].table_infos));
931            level.normalize();
932        }
933
934        assert!(
935            parent_levels
936                .l0
937                .sub_levels
938                .iter()
939                .all(|level| !level.table_infos.is_empty())
940        );
941        assert!(
942            cur_levels
943                .l0
944                .sub_levels
945                .iter()
946                .all(|level| !level.table_infos.is_empty())
947        );
948    }
949}
950
951impl<T> HummockVersionCommon<T>
952where
953    T: SstableIdReader + ObjectIdReader,
954{
955    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
956        self.get_sst_infos(exclude_change_log)
957            .map(|s| s.object_id())
958            .collect()
959    }
960
961    pub fn get_object_ids(
962        &self,
963        exclude_change_log: bool,
964    ) -> impl Iterator<Item = HummockObjectId> + '_ {
965        // DO NOT REMOVE THIS LINE
966        // This is to ensure that when adding new variant to `HummockObjectId`,
967        // the compiler will warn us if we forget to handle it here.
968        match HummockObjectId::Sstable(0.into()) {
969            HummockObjectId::Sstable(_) => {}
970            HummockObjectId::VectorFile(_) => {}
971            HummockObjectId::HnswGraphFile(_) => {}
972        };
973        self.get_sst_infos(exclude_change_log)
974            .map(|s| HummockObjectId::Sstable(s.object_id()))
975            .chain(
976                self.vector_indexes
977                    .values()
978                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
979            )
980    }
981
982    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
983        self.get_sst_infos(exclude_change_log)
984            .map(|s| s.sst_id())
985            .collect()
986    }
987
988    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
989        let may_table_change_log = if exclude_change_log {
990            None
991        } else {
992            Some(self.table_change_log.values())
993        };
994        self.get_combined_levels()
995            .flat_map(|level| level.table_infos.iter())
996            .chain(
997                may_table_change_log
998                    .map(|v| {
999                        v.flat_map(|table_change_log| {
1000                            table_change_log.iter().flat_map(|epoch_change_log| {
1001                                epoch_change_log
1002                                    .old_value
1003                                    .iter()
1004                                    .chain(epoch_change_log.new_value.iter())
1005                            })
1006                        })
1007                    })
1008                    .into_iter()
1009                    .flatten(),
1010            )
1011    }
1012}
1013
1014impl Levels {
1015    pub(crate) fn apply_compact_ssts(
1016        &mut self,
1017        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1018        member_table_ids: &BTreeSet<TableId>,
1019    ) {
1020        let IntraLevelDeltaCommon {
1021            level_idx,
1022            l0_sub_level_id,
1023            inserted_table_infos: insert_table_infos,
1024            vnode_partition_count,
1025            removed_table_ids: delete_sst_ids_set,
1026            compaction_group_version_id,
1027        } = level_delta;
1028        let new_vnode_partition_count = *vnode_partition_count;
1029
1030        if is_compaction_task_expired(
1031            self.compaction_group_version_id,
1032            *compaction_group_version_id,
1033        ) {
1034            warn!(
1035                current_compaction_group_version_id = self.compaction_group_version_id,
1036                delta_compaction_group_version_id = compaction_group_version_id,
1037                level_idx,
1038                l0_sub_level_id,
1039                insert_table_infos = ?insert_table_infos
1040                    .iter()
1041                    .map(|sst| (sst.sst_id, sst.object_id))
1042                    .collect_vec(),
1043                ?delete_sst_ids_set,
1044                "This VersionDelta may be committed by an expired compact task. Please check it."
1045            );
1046            return;
1047        }
1048        if !delete_sst_ids_set.is_empty() {
1049            if *level_idx == 0 {
1050                for level in &mut self.l0.sub_levels {
1051                    level.delete_ssts(delete_sst_ids_set);
1052                }
1053            } else {
1054                let idx = *level_idx as usize - 1;
1055                self.levels[idx].delete_ssts(delete_sst_ids_set);
1056            }
1057        }
1058
1059        if !insert_table_infos.is_empty() {
1060            let insert_sst_level_id = *level_idx;
1061            let insert_sub_level_id = *l0_sub_level_id;
1062            if insert_sst_level_id == 0 {
1063                let l0 = &mut self.l0;
1064                let index = l0
1065                    .sub_levels
1066                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1067                assert!(
1068                    index < l0.sub_levels.len()
1069                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1070                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1071                    insert_sub_level_id,
1072                    delete_sst_ids_set,
1073                    l0.sub_levels
1074                        .iter()
1075                        .map(|level| level.sub_level_id)
1076                        .collect_vec()
1077                );
1078                if l0.sub_levels[index].table_infos.is_empty()
1079                    && member_table_ids.len() == 1
1080                    && insert_table_infos.iter().all(|sst| {
1081                        sst.table_ids.len() == 1
1082                            && sst.table_ids[0]
1083                                == *member_table_ids.iter().next().expect("non-empty")
1084                    })
1085                {
1086                    // Only change vnode_partition_count for group which has only one state-table.
1087                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1088                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1089                }
1090                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1091            } else {
1092                let idx = insert_sst_level_id as usize - 1;
1093                if self.levels[idx].table_infos.is_empty()
1094                    && insert_table_infos
1095                        .iter()
1096                        .all(|sst| sst.table_ids.len() == 1)
1097                {
1098                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1099                } else if self.levels[idx].vnode_partition_count != 0
1100                    && new_vnode_partition_count == 0
1101                    && member_table_ids.len() > 1
1102                {
1103                    self.levels[idx].vnode_partition_count = 0;
1104                }
1105                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1106            }
1107        }
1108    }
1109
1110    /// Truncate specified table ids from all levels, remove emptied SSTs and sub-levels,
1111    /// then bump `compaction_group_version_id`.
1112    pub(crate) fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1113        for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1114            level.truncate_tables(table_ids);
1115        }
1116        self.l0.normalize();
1117        self.compaction_group_version_id += 1;
1118    }
1119}
1120
1121impl<T, L> HummockVersionCommon<T, L> {
1122    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1123        self.levels
1124            .values()
1125            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1126    }
1127}
1128
1129pub fn build_initial_compaction_group_levels(
1130    group_id: impl Into<CompactionGroupId>,
1131    compaction_config: &CompactionConfig,
1132) -> Levels {
1133    let mut levels = vec![];
1134    for l in 0..compaction_config.get_max_level() {
1135        levels.push(Level {
1136            level_idx: (l + 1) as u32,
1137            level_type: PbLevelType::Nonoverlapping,
1138            table_infos: vec![],
1139            total_file_size: 0,
1140            sub_level_id: 0,
1141            uncompressed_file_size: 0,
1142            vnode_partition_count: 0,
1143        });
1144    }
1145    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1146    Levels {
1147        levels,
1148        l0: OverlappingLevel {
1149            sub_levels: vec![],
1150            total_file_size: 0,
1151            uncompressed_file_size: 0,
1152        },
1153        group_id: group_id.into(),
1154        parent_group_id: 0.into(),
1155        member_table_ids: vec![],
1156        compaction_group_version_id: 0,
1157    }
1158}
1159
1160fn split_sst_info_for_level(
1161    member_table_ids: &BTreeSet<TableId>,
1162    level: &mut Level,
1163    new_sst_id: &mut HummockSstableId,
1164) -> Vec<SstableInfo> {
1165    // Remove SST from sub level may result in empty sub level. It will be purged
1166    // whenever another compaction task is finished.
1167    let mut insert_table_infos = vec![];
1168    for sst_info in &mut level.table_infos {
1169        let removed_table_ids = sst_info
1170            .table_ids
1171            .iter()
1172            .filter(|table_id| member_table_ids.contains(*table_id))
1173            .cloned()
1174            .collect_vec();
1175        let sst_size = sst_info.sst_size;
1176        if sst_size / 2 == 0 {
1177            tracing::warn!(
1178                id = %sst_info.sst_id,
1179                object_id = %sst_info.object_id,
1180                sst_size = sst_info.sst_size,
1181                file_size = sst_info.file_size,
1182                "Sstable sst_size is under expected",
1183            );
1184        };
1185        if !removed_table_ids.is_empty() {
1186            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1187                sst_info,
1188                new_sst_id,
1189                sst_size / 2,
1190                sst_size / 2,
1191                member_table_ids.iter().cloned().collect_vec(),
1192            );
1193            *sst_info = modified_sst;
1194            insert_table_infos.push(branch_sst);
1195        }
1196    }
1197    insert_table_infos
1198}
1199
1200/// Gets all compaction group ids.
1201pub fn get_compaction_group_ids(
1202    version: &HummockVersion,
1203) -> impl Iterator<Item = CompactionGroupId> + '_ {
1204    version.levels.keys().cloned()
1205}
1206
1207pub fn get_table_compaction_group_id_mapping(
1208    version: &HummockVersion,
1209) -> HashMap<StateTableId, CompactionGroupId> {
1210    version
1211        .state_table_info
1212        .info()
1213        .iter()
1214        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1215        .collect()
1216}
1217
1218/// Gets all SSTs in `group_id`
1219pub fn get_compaction_group_ssts(
1220    version: &HummockVersion,
1221    group_id: CompactionGroupId,
1222) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1223    let group_levels = version.get_compaction_group_levels(group_id);
1224    group_levels
1225        .l0
1226        .sub_levels
1227        .iter()
1228        .rev()
1229        .chain(group_levels.levels.iter())
1230        .flat_map(|level| {
1231            level
1232                .table_infos
1233                .iter()
1234                .map(|table_info| (table_info.object_id, table_info.sst_id))
1235        })
1236}
1237
1238pub fn new_sub_level(
1239    sub_level_id: u64,
1240    level_type: PbLevelType,
1241    table_infos: Vec<SstableInfo>,
1242) -> Level {
1243    if level_type == PbLevelType::Nonoverlapping {
1244        debug_assert!(
1245            can_concat(&table_infos),
1246            "sst of non-overlapping level is not concat-able: {:?}",
1247            table_infos
1248        );
1249    }
1250    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1251    let uncompressed_file_size = table_infos
1252        .iter()
1253        .map(|table| table.uncompressed_file_size)
1254        .sum();
1255    Level {
1256        level_idx: 0,
1257        level_type,
1258        table_infos,
1259        total_file_size,
1260        sub_level_id,
1261        uncompressed_file_size,
1262        vnode_partition_count: 0,
1263    }
1264}
1265
1266pub fn add_ssts_to_sub_level(
1267    l0: &mut OverlappingLevel,
1268    sub_level_idx: usize,
1269    insert_table_infos: Vec<SstableInfo>,
1270) {
1271    insert_table_infos.iter().for_each(|sst| {
1272        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1273        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1274        l0.total_file_size += sst.sst_size;
1275        l0.uncompressed_file_size += sst.uncompressed_file_size;
1276    });
1277    l0.sub_levels[sub_level_idx]
1278        .table_infos
1279        .extend(insert_table_infos);
1280    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1281        l0.sub_levels[sub_level_idx]
1282            .table_infos
1283            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1284        assert!(
1285            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1286            "sstable ids: {:?}",
1287            l0.sub_levels[sub_level_idx]
1288                .table_infos
1289                .iter()
1290                .map(|sst| sst.sst_id)
1291                .collect_vec()
1292        );
1293    }
1294}
1295
1296/// `None` value of `sub_level_insert_hint` means append.
1297pub fn insert_new_sub_level(
1298    l0: &mut OverlappingLevel,
1299    insert_sub_level_id: u64,
1300    level_type: PbLevelType,
1301    insert_table_infos: Vec<SstableInfo>,
1302    sub_level_insert_hint: Option<usize>,
1303) {
1304    if insert_sub_level_id == u64::MAX {
1305        return;
1306    }
1307    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1308        insert_pos
1309    } else {
1310        if let Some(newest_level) = l0.sub_levels.last() {
1311            assert!(
1312                newest_level.sub_level_id < insert_sub_level_id,
1313                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1314                newest_level.sub_level_id,
1315                insert_sub_level_id,
1316                l0,
1317            );
1318        }
1319        l0.sub_levels.len()
1320    };
1321    #[cfg(debug_assertions)]
1322    {
1323        if insert_pos > 0
1324            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1325        {
1326            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1327        }
1328        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1329            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1330        }
1331    }
1332    // All files will be committed in one new Overlapping sub-level and become
1333    // Nonoverlapping  after at least one compaction.
1334    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1335    l0.total_file_size += level.total_file_size;
1336    l0.uncompressed_file_size += level.uncompressed_file_size;
1337    l0.sub_levels.insert(insert_pos, level);
1338}
1339
1340impl Level {
1341    fn recompute_size(&mut self) {
1342        self.total_file_size = self
1343            .table_infos
1344            .iter()
1345            .map(|table| table.sst_size)
1346            .sum::<u64>();
1347        self.uncompressed_file_size = self
1348            .table_infos
1349            .iter()
1350            .map(|table| table.uncompressed_file_size)
1351            .sum::<u64>();
1352    }
1353
1354    /// Remove SSTs with empty `table_ids`, then recompute sizes.
1355    fn normalize(&mut self) {
1356        self.table_infos
1357            .retain(|sst_info| !sst_info.table_ids.is_empty());
1358        self.recompute_size();
1359    }
1360
1361    /// Return `true` if any SST was actually removed.
1362    fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1363        let original_len = self.table_infos.len();
1364        self.table_infos
1365            .retain(|table| !ids.contains(&table.sst_id));
1366        self.recompute_size();
1367        original_len != self.table_infos.len()
1368    }
1369
1370    /// Truncate specified `table_ids` from each SST's metadata,
1371    /// remove SSTs that become empty, then recompute sizes.
1372    fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1373        for sstable_info in &mut self.table_infos {
1374            let mut inner = sstable_info.get_inner();
1375            inner.table_ids.retain(|id| !table_ids.contains(id));
1376            sstable_info.set_inner(inner);
1377        }
1378        self.normalize();
1379    }
1380}
1381
1382impl OverlappingLevel {
1383    /// Remove empty sub-levels, then recompute aggregated sizes.
1384    fn normalize(&mut self) {
1385        self.sub_levels
1386            .retain(|level| !level.table_infos.is_empty());
1387        self.total_file_size = self
1388            .sub_levels
1389            .iter()
1390            .map(|level| level.total_file_size)
1391            .sum::<u64>();
1392        self.uncompressed_file_size = self
1393            .sub_levels
1394            .iter()
1395            .map(|level| level.uncompressed_file_size)
1396            .sum::<u64>();
1397    }
1398}
1399
1400fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1401    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1402        format!(
1403            "sstable ids: {:?}",
1404            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1405        )
1406    }
1407    operand.total_file_size += insert_table_infos
1408        .iter()
1409        .map(|sst| sst.sst_size)
1410        .sum::<u64>();
1411    operand.uncompressed_file_size += insert_table_infos
1412        .iter()
1413        .map(|sst| sst.uncompressed_file_size)
1414        .sum::<u64>();
1415    if operand.level_type == PbLevelType::Overlapping {
1416        operand.level_type = PbLevelType::Nonoverlapping;
1417        operand
1418            .table_infos
1419            .extend(insert_table_infos.iter().cloned());
1420        operand
1421            .table_infos
1422            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1423        assert!(
1424            can_concat(&operand.table_infos),
1425            "{}",
1426            display_sstable_infos(&operand.table_infos)
1427        );
1428    } else if !insert_table_infos.is_empty() {
1429        let sorted_insert: Vec<_> = insert_table_infos
1430            .iter()
1431            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1432            .cloned()
1433            .collect();
1434        let first = &sorted_insert[0];
1435        let last = &sorted_insert[sorted_insert.len() - 1];
1436        let pos = operand
1437            .table_infos
1438            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1439        if pos >= operand.table_infos.len()
1440            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1441        {
1442            operand.table_infos.splice(pos..pos, sorted_insert);
1443            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1444            let validate_range = operand
1445                .table_infos
1446                .iter()
1447                .skip(pos.saturating_sub(1))
1448                .take(insert_table_infos.len() + 2)
1449                .collect_vec();
1450            assert!(
1451                can_concat(&validate_range),
1452                "{}",
1453                display_sstable_infos(&validate_range),
1454            );
1455        } else {
1456            // If this branch is reached, it indicates some unexpected behavior in compaction.
1457            // Here we issue a warning and fall back to insert one by one.
1458            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1459            for i in insert_table_infos {
1460                let pos = operand
1461                    .table_infos
1462                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1463                operand.table_infos.insert(pos, i.clone());
1464            }
1465            assert!(
1466                can_concat(&operand.table_infos),
1467                "{}",
1468                display_sstable_infos(&operand.table_infos)
1469            );
1470        }
1471    }
1472}
1473
1474pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1475    // DO NOT REMOVE THIS LINE
1476    // This is to ensure that when adding new variant to `HummockObjectId`,
1477    // the compiler will warn us if we forget to handle it here.
1478    match HummockObjectId::Sstable(0.into()) {
1479        HummockObjectId::Sstable(_) => {}
1480        HummockObjectId::VectorFile(_) => {}
1481        HummockObjectId::HnswGraphFile(_) => {}
1482    };
1483    version
1484        .levels
1485        .values()
1486        .flat_map(|cg| {
1487            cg.level0()
1488                .sub_levels
1489                .iter()
1490                .chain(cg.levels.iter())
1491                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1492        })
1493        .chain(version.table_change_log.values().flat_map(|c| {
1494            c.iter().flat_map(|l| {
1495                l.old_value
1496                    .iter()
1497                    .chain(l.new_value.iter())
1498                    .map(|t| (t.object_id, t.file_size))
1499            })
1500        }))
1501        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1502        .chain(
1503            version
1504                .vector_indexes
1505                .values()
1506                .flat_map(|index| index.get_objects()),
1507        )
1508        .collect()
1509}
1510
1511/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1512/// Currently this method is only used by risectl validate-version.
1513pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1514    let mut res = Vec::new();
1515    // Ensure each table maps to only one compaction group
1516    for (group_id, levels) in &version.levels {
1517        // Ensure compaction group id matches
1518        if levels.group_id != *group_id {
1519            res.push(format!(
1520                "GROUP {}: inconsistent group id {} in Levels",
1521                group_id, levels.group_id
1522            ));
1523        }
1524
1525        let validate_level = |group: CompactionGroupId,
1526                              expected_level_idx: u32,
1527                              level: &Level,
1528                              res: &mut Vec<String>| {
1529            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1530            if level.level_idx == 0 {
1531                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1532                // Ensure sub-level is not empty
1533                if level.table_infos.is_empty() {
1534                    res.push(format!("{}: empty level", level_identifier));
1535                }
1536            } else if level.level_type != PbLevelType::Nonoverlapping {
1537                // Ensure non-L0 level is non-overlapping level
1538                res.push(format!(
1539                    "{}: level type {:?} is not non-overlapping",
1540                    level_identifier, level.level_type
1541                ));
1542            }
1543
1544            // Ensure level idx matches
1545            if level.level_idx != expected_level_idx {
1546                res.push(format!(
1547                    "{}: mismatched level idx {}",
1548                    level_identifier, expected_level_idx
1549                ));
1550            }
1551
1552            let mut prev_table_info: Option<&SstableInfo> = None;
1553            for table_info in &level.table_infos {
1554                // Ensure table_ids are sorted and unique
1555                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1556                    res.push(format!(
1557                        "{} SST {}: table_ids not sorted",
1558                        level_identifier, table_info.object_id
1559                    ));
1560                }
1561
1562                // Ensure SSTs in non-overlapping level have non-overlapping key range
1563                if level.level_type == PbLevelType::Nonoverlapping {
1564                    if let Some(prev) = prev_table_info.take()
1565                        && prev
1566                            .key_range
1567                            .compare_right_with(&table_info.key_range.left)
1568                            != Ordering::Less
1569                    {
1570                        res.push(format!(
1571                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1572                            level_identifier, table_info.object_id, prev, table_info
1573                        ));
1574                    }
1575                    let _ = prev_table_info.insert(table_info);
1576                }
1577            }
1578        };
1579
1580        let l0 = &levels.l0;
1581        let mut prev_sub_level_id = u64::MAX;
1582        for sub_level in &l0.sub_levels {
1583            // Ensure sub_level_id is sorted and unique
1584            if sub_level.sub_level_id >= prev_sub_level_id {
1585                res.push(format!(
1586                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1587                    group_id, sub_level.level_idx, prev_sub_level_id
1588                ));
1589            }
1590            prev_sub_level_id = sub_level.sub_level_id;
1591
1592            validate_level(*group_id, 0, sub_level, &mut res);
1593        }
1594
1595        for idx in 1..=levels.levels.len() {
1596            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1597        }
1598    }
1599    res
1600}
1601
1602#[cfg(test)]
1603mod tests {
1604    use std::collections::{HashMap, HashSet};
1605
1606    use bytes::Bytes;
1607    use risingwave_common::catalog::TableId;
1608    use risingwave_common::hash::VirtualNode;
1609    use risingwave_common::util::epoch::test_epoch;
1610    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1611
1612    use super::group_split;
1613    use crate::HummockVersionId;
1614    use crate::compaction_group::group_split::*;
1615    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1616    use crate::key::{FullKey, gen_key_from_str};
1617    use crate::key_range::KeyRange;
1618    use crate::level::{Level, Levels, OverlappingLevel};
1619    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1620    use crate::version::{
1621        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1622    };
1623
1624    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1625        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1626    }
1627
1628    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1629        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1630        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1631        let full_key_l = FullKey::for_test(
1632            TableId::new(*table_ids.first().unwrap()),
1633            table_key_l,
1634            epoch,
1635        )
1636        .encode();
1637        let full_key_r =
1638            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1639                .encode();
1640
1641        SstableInfoInner {
1642            sst_id: sst_id.into(),
1643            key_range: KeyRange {
1644                left: full_key_l.into(),
1645                right: full_key_r.into(),
1646                right_exclusive: false,
1647            },
1648            table_ids: table_ids.into_iter().map(Into::into).collect(),
1649            object_id: sst_id.into(),
1650            min_epoch: 20,
1651            max_epoch: 20,
1652            file_size: 100,
1653            sst_size: 100,
1654            ..Default::default()
1655        }
1656    }
1657
1658    #[test]
1659    fn test_get_sst_object_ids() {
1660        let mut version = HummockVersion {
1661            id: HummockVersionId::new(0),
1662            levels: HashMap::from_iter([(
1663                0.into(),
1664                Levels {
1665                    levels: vec![],
1666                    l0: OverlappingLevel {
1667                        sub_levels: vec![],
1668                        total_file_size: 0,
1669                        uncompressed_file_size: 0,
1670                    },
1671                    ..Default::default()
1672                },
1673            )]),
1674            ..Default::default()
1675        };
1676        assert_eq!(version.get_object_ids(false).count(), 0);
1677
1678        // Add to sub level
1679        version
1680            .levels
1681            .get_mut(&0)
1682            .unwrap()
1683            .l0
1684            .sub_levels
1685            .push(Level {
1686                table_infos: vec![
1687                    SstableInfoInner {
1688                        object_id: 11.into(),
1689                        sst_id: 11.into(),
1690                        ..Default::default()
1691                    }
1692                    .into(),
1693                ],
1694                ..Default::default()
1695            });
1696        assert_eq!(version.get_object_ids(false).count(), 1);
1697
1698        // Add to non sub level
1699        version.levels.get_mut(&0).unwrap().levels.push(Level {
1700            table_infos: vec![
1701                SstableInfoInner {
1702                    object_id: 22.into(),
1703                    sst_id: 22.into(),
1704                    ..Default::default()
1705                }
1706                .into(),
1707            ],
1708            ..Default::default()
1709        });
1710        assert_eq!(version.get_object_ids(false).count(), 2);
1711    }
1712
1713    #[test]
1714    fn test_apply_version_delta() {
1715        let mut version = HummockVersion {
1716            id: HummockVersionId::new(0),
1717            levels: HashMap::from_iter([
1718                (
1719                    0.into(),
1720                    build_initial_compaction_group_levels(
1721                        0,
1722                        &CompactionConfig {
1723                            max_level: 6,
1724                            ..Default::default()
1725                        },
1726                    ),
1727                ),
1728                (
1729                    1.into(),
1730                    build_initial_compaction_group_levels(
1731                        1,
1732                        &CompactionConfig {
1733                            max_level: 6,
1734                            ..Default::default()
1735                        },
1736                    ),
1737                ),
1738            ]),
1739            ..Default::default()
1740        };
1741        let version_delta = HummockVersionDelta {
1742            id: HummockVersionId::new(1),
1743            group_deltas: HashMap::from_iter([
1744                (
1745                    2.into(),
1746                    GroupDeltas {
1747                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1748                            group_config: Some(CompactionConfig {
1749                                max_level: 6,
1750                                ..Default::default()
1751                            }),
1752                            ..Default::default()
1753                        }))],
1754                    },
1755                ),
1756                (
1757                    0.into(),
1758                    GroupDeltas {
1759                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1760                    },
1761                ),
1762                (
1763                    1.into(),
1764                    GroupDeltas {
1765                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1766                            1,
1767                            0,
1768                            HashSet::new(),
1769                            vec![
1770                                SstableInfoInner {
1771                                    object_id: 1.into(),
1772                                    sst_id: 1.into(),
1773                                    ..Default::default()
1774                                }
1775                                .into(),
1776                            ],
1777                            0,
1778                            version
1779                                .levels
1780                                .get(&1)
1781                                .as_ref()
1782                                .unwrap()
1783                                .compaction_group_version_id,
1784                        ))],
1785                    },
1786                ),
1787            ]),
1788            ..Default::default()
1789        };
1790        let version_delta = version_delta;
1791
1792        version.apply_version_delta(&version_delta);
1793        let mut cg1 = build_initial_compaction_group_levels(
1794            1,
1795            &CompactionConfig {
1796                max_level: 6,
1797                ..Default::default()
1798            },
1799        );
1800        cg1.levels[0] = Level {
1801            level_idx: 1,
1802            level_type: LevelType::Nonoverlapping,
1803            table_infos: vec![
1804                SstableInfoInner {
1805                    object_id: 1.into(),
1806                    sst_id: 1.into(),
1807                    ..Default::default()
1808                }
1809                .into(),
1810            ],
1811            ..Default::default()
1812        };
1813        assert_eq!(
1814            version,
1815            HummockVersion {
1816                id: HummockVersionId::new(1),
1817                levels: HashMap::from_iter([
1818                    (
1819                        2.into(),
1820                        build_initial_compaction_group_levels(
1821                            2,
1822                            &CompactionConfig {
1823                                max_level: 6,
1824                                ..Default::default()
1825                            },
1826                        ),
1827                    ),
1828                    (1.into(), cg1),
1829                ]),
1830                ..Default::default()
1831            }
1832        );
1833    }
1834
1835    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1836        gen_sst_info_impl(object_id, table_ids, left, right).into()
1837    }
1838
1839    fn gen_sst_info_impl(
1840        object_id: u64,
1841        table_ids: Vec<u32>,
1842        left: Bytes,
1843        right: Bytes,
1844    ) -> SstableInfoInner {
1845        SstableInfoInner {
1846            object_id: object_id.into(),
1847            sst_id: object_id.into(),
1848            key_range: KeyRange {
1849                left,
1850                right,
1851                right_exclusive: false,
1852            },
1853            table_ids: table_ids.into_iter().map(Into::into).collect(),
1854            file_size: 100,
1855            sst_size: 100,
1856            uncompressed_file_size: 100,
1857            ..Default::default()
1858        }
1859    }
1860
1861    #[test]
1862    fn test_merge_levels() {
1863        let mut left_levels = build_initial_compaction_group_levels(
1864            1,
1865            &CompactionConfig {
1866                max_level: 6,
1867                ..Default::default()
1868            },
1869        );
1870
1871        let mut right_levels = build_initial_compaction_group_levels(
1872            2,
1873            &CompactionConfig {
1874                max_level: 6,
1875                ..Default::default()
1876            },
1877        );
1878
1879        left_levels.levels[0] = Level {
1880            level_idx: 1,
1881            level_type: LevelType::Nonoverlapping,
1882            table_infos: vec![
1883                gen_sst_info(
1884                    1,
1885                    vec![3],
1886                    FullKey::for_test(
1887                        TableId::new(3),
1888                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1889                        0,
1890                    )
1891                    .encode()
1892                    .into(),
1893                    FullKey::for_test(
1894                        TableId::new(3),
1895                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1896                        0,
1897                    )
1898                    .encode()
1899                    .into(),
1900                ),
1901                gen_sst_info(
1902                    10,
1903                    vec![3, 4],
1904                    FullKey::for_test(
1905                        TableId::new(3),
1906                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1907                        0,
1908                    )
1909                    .encode()
1910                    .into(),
1911                    FullKey::for_test(
1912                        TableId::new(4),
1913                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1914                        0,
1915                    )
1916                    .encode()
1917                    .into(),
1918                ),
1919                gen_sst_info(
1920                    11,
1921                    vec![4],
1922                    FullKey::for_test(
1923                        TableId::new(4),
1924                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1925                        0,
1926                    )
1927                    .encode()
1928                    .into(),
1929                    FullKey::for_test(
1930                        TableId::new(4),
1931                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1932                        0,
1933                    )
1934                    .encode()
1935                    .into(),
1936                ),
1937            ],
1938            total_file_size: 300,
1939            ..Default::default()
1940        };
1941
1942        left_levels.l0.sub_levels.push(Level {
1943            level_idx: 0,
1944            table_infos: vec![gen_sst_info(
1945                3,
1946                vec![3],
1947                FullKey::for_test(
1948                    TableId::new(3),
1949                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1950                    0,
1951                )
1952                .encode()
1953                .into(),
1954                FullKey::for_test(
1955                    TableId::new(3),
1956                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1957                    0,
1958                )
1959                .encode()
1960                .into(),
1961            )],
1962            sub_level_id: 101,
1963            level_type: LevelType::Overlapping,
1964            total_file_size: 100,
1965            ..Default::default()
1966        });
1967
1968        left_levels.l0.sub_levels.push(Level {
1969            level_idx: 0,
1970            table_infos: vec![gen_sst_info(
1971                3,
1972                vec![3],
1973                FullKey::for_test(
1974                    TableId::new(3),
1975                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1976                    0,
1977                )
1978                .encode()
1979                .into(),
1980                FullKey::for_test(
1981                    TableId::new(3),
1982                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1983                    0,
1984                )
1985                .encode()
1986                .into(),
1987            )],
1988            sub_level_id: 103,
1989            level_type: LevelType::Overlapping,
1990            total_file_size: 100,
1991            ..Default::default()
1992        });
1993
1994        left_levels.l0.sub_levels.push(Level {
1995            level_idx: 0,
1996            table_infos: vec![gen_sst_info(
1997                3,
1998                vec![3],
1999                FullKey::for_test(
2000                    TableId::new(3),
2001                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2002                    0,
2003                )
2004                .encode()
2005                .into(),
2006                FullKey::for_test(
2007                    TableId::new(3),
2008                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2009                    0,
2010                )
2011                .encode()
2012                .into(),
2013            )],
2014            sub_level_id: 105,
2015            level_type: LevelType::Nonoverlapping,
2016            total_file_size: 100,
2017            ..Default::default()
2018        });
2019
2020        right_levels.levels[0] = Level {
2021            level_idx: 1,
2022            level_type: LevelType::Nonoverlapping,
2023            table_infos: vec![
2024                gen_sst_info(
2025                    1,
2026                    vec![5],
2027                    FullKey::for_test(
2028                        TableId::new(5),
2029                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2030                        0,
2031                    )
2032                    .encode()
2033                    .into(),
2034                    FullKey::for_test(
2035                        TableId::new(5),
2036                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2037                        0,
2038                    )
2039                    .encode()
2040                    .into(),
2041                ),
2042                gen_sst_info(
2043                    10,
2044                    vec![5, 6],
2045                    FullKey::for_test(
2046                        TableId::new(5),
2047                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2048                        0,
2049                    )
2050                    .encode()
2051                    .into(),
2052                    FullKey::for_test(
2053                        TableId::new(6),
2054                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2055                        0,
2056                    )
2057                    .encode()
2058                    .into(),
2059                ),
2060                gen_sst_info(
2061                    11,
2062                    vec![6],
2063                    FullKey::for_test(
2064                        TableId::new(6),
2065                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2066                        0,
2067                    )
2068                    .encode()
2069                    .into(),
2070                    FullKey::for_test(
2071                        TableId::new(6),
2072                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2073                        0,
2074                    )
2075                    .encode()
2076                    .into(),
2077                ),
2078            ],
2079            total_file_size: 300,
2080            ..Default::default()
2081        };
2082
2083        right_levels.l0.sub_levels.push(Level {
2084            level_idx: 0,
2085            table_infos: vec![gen_sst_info(
2086                3,
2087                vec![5],
2088                FullKey::for_test(
2089                    TableId::new(5),
2090                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2091                    0,
2092                )
2093                .encode()
2094                .into(),
2095                FullKey::for_test(
2096                    TableId::new(5),
2097                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2098                    0,
2099                )
2100                .encode()
2101                .into(),
2102            )],
2103            sub_level_id: 101,
2104            level_type: LevelType::Overlapping,
2105            total_file_size: 100,
2106            ..Default::default()
2107        });
2108
2109        right_levels.l0.sub_levels.push(Level {
2110            level_idx: 0,
2111            table_infos: vec![gen_sst_info(
2112                5,
2113                vec![5],
2114                FullKey::for_test(
2115                    TableId::new(5),
2116                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2117                    0,
2118                )
2119                .encode()
2120                .into(),
2121                FullKey::for_test(
2122                    TableId::new(5),
2123                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2124                    0,
2125                )
2126                .encode()
2127                .into(),
2128            )],
2129            sub_level_id: 102,
2130            level_type: LevelType::Overlapping,
2131            total_file_size: 100,
2132            ..Default::default()
2133        });
2134
2135        right_levels.l0.sub_levels.push(Level {
2136            level_idx: 0,
2137            table_infos: vec![gen_sst_info(
2138                3,
2139                vec![5],
2140                FullKey::for_test(
2141                    TableId::new(5),
2142                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2143                    0,
2144                )
2145                .encode()
2146                .into(),
2147                FullKey::for_test(
2148                    TableId::new(5),
2149                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2150                    0,
2151                )
2152                .encode()
2153                .into(),
2154            )],
2155            sub_level_id: 103,
2156            level_type: LevelType::Nonoverlapping,
2157            total_file_size: 100,
2158            ..Default::default()
2159        });
2160
2161        {
2162            // test empty
2163            let mut left_levels = Levels::default();
2164            let right_levels = Levels::default();
2165
2166            group_split::merge_levels(&mut left_levels, right_levels);
2167        }
2168
2169        {
2170            // test empty left
2171            let mut left_levels = build_initial_compaction_group_levels(
2172                1,
2173                &CompactionConfig {
2174                    max_level: 6,
2175                    ..Default::default()
2176                },
2177            );
2178            let right_levels = right_levels.clone();
2179
2180            group_split::merge_levels(&mut left_levels, right_levels);
2181
2182            assert!(left_levels.l0.sub_levels.len() == 3);
2183            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2184            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2185            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2186            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2187            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2188            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2189
2190            assert!(left_levels.levels[0].level_idx == 1);
2191            assert_eq!(300, left_levels.levels[0].total_file_size);
2192        }
2193
2194        {
2195            // test empty right
2196            let mut left_levels = left_levels.clone();
2197            let right_levels = build_initial_compaction_group_levels(
2198                2,
2199                &CompactionConfig {
2200                    max_level: 6,
2201                    ..Default::default()
2202                },
2203            );
2204
2205            group_split::merge_levels(&mut left_levels, right_levels);
2206
2207            assert!(left_levels.l0.sub_levels.len() == 3);
2208            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2209            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2210            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2211            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2212            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2213            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2214
2215            assert!(left_levels.levels[0].level_idx == 1);
2216            assert_eq!(300, left_levels.levels[0].total_file_size);
2217        }
2218
2219        {
2220            let mut left_levels = left_levels.clone();
2221            let right_levels = right_levels.clone();
2222
2223            group_split::merge_levels(&mut left_levels, right_levels);
2224
2225            assert!(left_levels.l0.sub_levels.len() == 6);
2226            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2227            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2228            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2229            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2230            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2231            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2232            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2233            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2234            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2235            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2236            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2237            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2238
2239            assert!(left_levels.levels[0].level_idx == 1);
2240            assert_eq!(600, left_levels.levels[0].total_file_size);
2241        }
2242    }
2243
2244    #[test]
2245    fn test_get_split_pos() {
2246        let epoch = test_epoch(1);
2247        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2248        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2249        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2250
2251        let ssts = vec![s1, s2, s3];
2252        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2253
2254        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2255        assert_eq!(1, pos);
2256
2257        let pos = group_split::get_split_pos(&vec![], split_key);
2258        assert_eq!(0, pos);
2259    }
2260
2261    #[test]
2262    fn test_split_sst() {
2263        let epoch = test_epoch(1);
2264        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2265
2266        {
2267            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2268            let origin_sst = sst.clone();
2269            let sst_size = origin_sst.sst_size;
2270            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2271            assert_eq!(SstSplitType::Both, split_type);
2272
2273            let mut new_sst_id = 10.into();
2274            let (origin_sst, branched_sst) = group_split::split_sst(
2275                origin_sst,
2276                &mut new_sst_id,
2277                split_key,
2278                sst_size / 2,
2279                sst_size / 2,
2280            );
2281
2282            let origin_sst = origin_sst.unwrap();
2283            let branched_sst = branched_sst.unwrap();
2284
2285            assert!(origin_sst.key_range.right_exclusive);
2286            assert!(
2287                origin_sst
2288                    .key_range
2289                    .right
2290                    .cmp(&branched_sst.key_range.left)
2291                    .is_le()
2292            );
2293            assert!(origin_sst.table_ids.is_sorted());
2294            assert!(branched_sst.table_ids.is_sorted());
2295            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2296            assert!(branched_sst.sst_size < origin_sst.file_size);
2297            assert_eq!(10, branched_sst.sst_id);
2298            assert_eq!(11, origin_sst.sst_id);
2299            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2300        }
2301
2302        {
2303            // test un-exist table_id
2304            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2305            let origin_sst = sst.clone();
2306            let sst_size = origin_sst.sst_size;
2307            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2308            assert_eq!(SstSplitType::Both, split_type);
2309
2310            let mut new_sst_id = 10.into();
2311            let (origin_sst, branched_sst) = group_split::split_sst(
2312                origin_sst,
2313                &mut new_sst_id,
2314                split_key,
2315                sst_size / 2,
2316                sst_size / 2,
2317            );
2318
2319            let origin_sst = origin_sst.unwrap();
2320            let branched_sst = branched_sst.unwrap();
2321
2322            assert!(origin_sst.key_range.right_exclusive);
2323            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2324            assert!(origin_sst.table_ids.is_sorted());
2325            assert!(branched_sst.table_ids.is_sorted());
2326            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2327            assert!(branched_sst.sst_size < origin_sst.file_size);
2328            assert_eq!(10, branched_sst.sst_id);
2329            assert_eq!(11, origin_sst.sst_id);
2330            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2331        }
2332
2333        {
2334            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2335            let split_type = group_split::need_to_split(&sst, split_key);
2336            assert_eq!(SstSplitType::Left, split_type);
2337        }
2338
2339        {
2340            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2341            let origin_sst = sst.clone();
2342            let split_type = group_split::need_to_split(&origin_sst, split_key);
2343            assert_eq!(SstSplitType::Both, split_type);
2344
2345            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2346            let origin_sst = sst;
2347            let split_type = group_split::need_to_split(&origin_sst, split_key);
2348            assert_eq!(SstSplitType::Right, split_type);
2349        }
2350
2351        {
2352            // test key_range left = right
2353            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2354            sst.key_range.right = sst.key_range.left.clone();
2355            let sst: SstableInfo = sst.into();
2356            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2357            let origin_sst = sst;
2358            let sst_size = origin_sst.sst_size;
2359
2360            let mut new_sst_id = 10.into();
2361            let (origin_sst, branched_sst) = group_split::split_sst(
2362                origin_sst,
2363                &mut new_sst_id,
2364                split_key,
2365                sst_size / 2,
2366                sst_size / 2,
2367            );
2368
2369            assert!(origin_sst.is_none());
2370            assert!(branched_sst.is_some());
2371        }
2372    }
2373
2374    #[test]
2375    fn test_split_sst_info_for_level() {
2376        let mut version = HummockVersion {
2377            id: HummockVersionId::new(0),
2378            levels: HashMap::from_iter([(
2379                1.into(),
2380                build_initial_compaction_group_levels(
2381                    1,
2382                    &CompactionConfig {
2383                        max_level: 6,
2384                        ..Default::default()
2385                    },
2386                ),
2387            )]),
2388            ..Default::default()
2389        };
2390
2391        let cg1 = version.levels.get_mut(&1).unwrap();
2392
2393        cg1.levels[0] = Level {
2394            level_idx: 1,
2395            level_type: LevelType::Nonoverlapping,
2396            table_infos: vec![
2397                gen_sst_info(
2398                    1,
2399                    vec![3],
2400                    FullKey::for_test(
2401                        TableId::new(3),
2402                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2403                        0,
2404                    )
2405                    .encode()
2406                    .into(),
2407                    FullKey::for_test(
2408                        TableId::new(3),
2409                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2410                        0,
2411                    )
2412                    .encode()
2413                    .into(),
2414                ),
2415                gen_sst_info(
2416                    10,
2417                    vec![3, 4],
2418                    FullKey::for_test(
2419                        TableId::new(3),
2420                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2421                        0,
2422                    )
2423                    .encode()
2424                    .into(),
2425                    FullKey::for_test(
2426                        TableId::new(4),
2427                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2428                        0,
2429                    )
2430                    .encode()
2431                    .into(),
2432                ),
2433                gen_sst_info(
2434                    11,
2435                    vec![4],
2436                    FullKey::for_test(
2437                        TableId::new(4),
2438                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2439                        0,
2440                    )
2441                    .encode()
2442                    .into(),
2443                    FullKey::for_test(
2444                        TableId::new(4),
2445                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2446                        0,
2447                    )
2448                    .encode()
2449                    .into(),
2450                ),
2451            ],
2452            total_file_size: 300,
2453            ..Default::default()
2454        };
2455
2456        cg1.l0.sub_levels.push(Level {
2457            level_idx: 0,
2458            table_infos: vec![
2459                gen_sst_info(
2460                    2,
2461                    vec![2],
2462                    FullKey::for_test(
2463                        TableId::new(0),
2464                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2465                        0,
2466                    )
2467                    .encode()
2468                    .into(),
2469                    FullKey::for_test(
2470                        TableId::new(2),
2471                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2472                        0,
2473                    )
2474                    .encode()
2475                    .into(),
2476                ),
2477                gen_sst_info(
2478                    22,
2479                    vec![2],
2480                    FullKey::for_test(
2481                        TableId::new(0),
2482                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2483                        0,
2484                    )
2485                    .encode()
2486                    .into(),
2487                    FullKey::for_test(
2488                        TableId::new(2),
2489                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2490                        0,
2491                    )
2492                    .encode()
2493                    .into(),
2494                ),
2495                gen_sst_info(
2496                    23,
2497                    vec![2],
2498                    FullKey::for_test(
2499                        TableId::new(0),
2500                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2501                        0,
2502                    )
2503                    .encode()
2504                    .into(),
2505                    FullKey::for_test(
2506                        TableId::new(2),
2507                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2508                        0,
2509                    )
2510                    .encode()
2511                    .into(),
2512                ),
2513                gen_sst_info(
2514                    24,
2515                    vec![2],
2516                    FullKey::for_test(
2517                        TableId::new(2),
2518                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2519                        0,
2520                    )
2521                    .encode()
2522                    .into(),
2523                    FullKey::for_test(
2524                        TableId::new(2),
2525                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2526                        0,
2527                    )
2528                    .encode()
2529                    .into(),
2530                ),
2531                gen_sst_info(
2532                    25,
2533                    vec![2],
2534                    FullKey::for_test(
2535                        TableId::new(0),
2536                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2537                        0,
2538                    )
2539                    .encode()
2540                    .into(),
2541                    FullKey::for_test(
2542                        TableId::new(0),
2543                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2544                        0,
2545                    )
2546                    .encode()
2547                    .into(),
2548                ),
2549            ],
2550            sub_level_id: 101,
2551            level_type: LevelType::Overlapping,
2552            total_file_size: 300,
2553            ..Default::default()
2554        });
2555
2556        {
2557            // split Overlapping level
2558            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2559
2560            let mut new_sst_id = 100.into();
2561            let x = group_split::split_sst_info_for_level_v2(
2562                &mut cg1.l0.sub_levels[0],
2563                &mut new_sst_id,
2564                split_key,
2565            );
2566            // assert_eq!(3, x.len());
2567            // assert_eq!(100, x[0].sst_id);
2568            // assert_eq!(100, x[0].sst_size);
2569            // assert_eq!(101, x[1].sst_id);
2570            // assert_eq!(100, x[1].sst_size);
2571            // assert_eq!(102, x[2].sst_id);
2572            // assert_eq!(100, x[2].sst_size);
2573
2574            let mut right_l0 = OverlappingLevel {
2575                sub_levels: vec![],
2576                total_file_size: 0,
2577                uncompressed_file_size: 0,
2578            };
2579
2580            right_l0.sub_levels.push(Level {
2581                level_idx: 0,
2582                table_infos: x,
2583                sub_level_id: 101,
2584                total_file_size: 100,
2585                level_type: LevelType::Overlapping,
2586                ..Default::default()
2587            });
2588
2589            let right_levels = Levels {
2590                levels: vec![],
2591                l0: right_l0,
2592                ..Default::default()
2593            };
2594
2595            merge_levels(cg1, right_levels);
2596        }
2597
2598        {
2599            // test split empty level
2600            let mut new_sst_id = 100.into();
2601            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2602            let x = group_split::split_sst_info_for_level_v2(
2603                &mut cg1.levels[2],
2604                &mut new_sst_id,
2605                split_key,
2606            );
2607
2608            assert!(x.is_empty());
2609        }
2610
2611        {
2612            // test split to right Nonoverlapping level
2613            let mut cg1 = cg1.clone();
2614            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2615
2616            let mut new_sst_id = 100.into();
2617            let x = group_split::split_sst_info_for_level_v2(
2618                &mut cg1.levels[0],
2619                &mut new_sst_id,
2620                split_key,
2621            );
2622
2623            assert_eq!(3, x.len());
2624            assert_eq!(1, x[0].sst_id);
2625            assert_eq!(100, x[0].sst_size);
2626            assert_eq!(10, x[1].sst_id);
2627            assert_eq!(100, x[1].sst_size);
2628            assert_eq!(11, x[2].sst_id);
2629            assert_eq!(100, x[2].sst_size);
2630
2631            assert_eq!(0, cg1.levels[0].table_infos.len());
2632        }
2633
2634        {
2635            // test split to left Nonoverlapping level
2636            let mut cg1 = cg1.clone();
2637            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2638
2639            let mut new_sst_id = 100.into();
2640            let x = group_split::split_sst_info_for_level_v2(
2641                &mut cg1.levels[0],
2642                &mut new_sst_id,
2643                split_key,
2644            );
2645
2646            assert_eq!(0, x.len());
2647            assert_eq!(3, cg1.levels[0].table_infos.len());
2648        }
2649
2650        // {
2651        //     // test split to both Nonoverlapping level
2652        //     let mut cg1 = cg1.clone();
2653        //     let split_key = build_split_key(3, VirtualNode::MAX);
2654
2655        //     let mut new_sst_id = 100;
2656        //     let x = group_split::split_sst_info_for_level_v2(
2657        //         &mut cg1.levels[0],
2658        //         &mut new_sst_id,
2659        //         split_key,
2660        //     );
2661
2662        //     assert_eq!(2, x.len());
2663        //     assert_eq!(100, x[0].sst_id);
2664        //     assert_eq!(100 / 2, x[0].sst_size);
2665        //     assert_eq!(11, x[1].sst_id);
2666        //     assert_eq!(100, x[1].sst_size);
2667        //     assert_eq!(vec![3, 4], x[0].table_ids);
2668
2669        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2670        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2671        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2672        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2673        // }
2674
2675        {
2676            // test split to both Nonoverlapping level
2677            let mut cg1 = cg1.clone();
2678            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2679
2680            let mut new_sst_id = 100.into();
2681            let x = group_split::split_sst_info_for_level_v2(
2682                &mut cg1.levels[0],
2683                &mut new_sst_id,
2684                split_key,
2685            );
2686
2687            assert_eq!(2, x.len());
2688            assert_eq!(100, x[0].sst_id);
2689            assert_eq!(100 / 2, x[0].sst_size);
2690            assert_eq!(11, x[1].sst_id);
2691            assert_eq!(100, x[1].sst_size);
2692            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2693
2694            assert_eq!(2, cg1.levels[0].table_infos.len());
2695            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2696            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2697            assert_eq!(
2698                vec![TableId::new(3)],
2699                cg1.levels[0].table_infos[1].table_ids
2700            );
2701        }
2702    }
2703
2704    fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2705        SstableInfoInner {
2706            sst_id: sst_id.into(),
2707            object_id: sst_id.into(),
2708            table_ids: table_ids.into_iter().map(TableId::new).collect(),
2709            file_size: sst_size,
2710            sst_size,
2711            uncompressed_file_size: sst_size * 2,
2712            ..Default::default()
2713        }
2714        .into()
2715    }
2716
2717    #[test]
2718    fn test_level_normalize() {
2719        // Mixed: some SSTs empty, some not.
2720        let mut level = Level {
2721            level_idx: 1,
2722            level_type: LevelType::Nonoverlapping,
2723            table_infos: vec![
2724                make_sst(1, vec![1, 2], 100),
2725                make_sst(2, vec![], 200), // empty → removed
2726                make_sst(3, vec![3], 300),
2727                make_sst(4, vec![], 400), // empty → removed
2728            ],
2729            total_file_size: 9999, // intentionally wrong
2730            uncompressed_file_size: 9999,
2731            ..Default::default()
2732        };
2733
2734        level.normalize();
2735
2736        assert_eq!(level.table_infos.len(), 2);
2737        assert_eq!(1, level.table_infos[0].sst_id);
2738        assert_eq!(3, level.table_infos[1].sst_id);
2739        assert_eq!(level.total_file_size, 400);
2740        assert_eq!(level.uncompressed_file_size, 800);
2741
2742        // No empty SSTs: just recompute sizes.
2743        level.total_file_size = 0;
2744        level.uncompressed_file_size = 0;
2745        level.normalize();
2746        assert_eq!(level.table_infos.len(), 2);
2747        assert_eq!(level.total_file_size, 400);
2748        assert_eq!(level.uncompressed_file_size, 800);
2749
2750        // All empty → level becomes empty.
2751        level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2752        level.normalize();
2753        assert!(level.table_infos.is_empty());
2754        assert_eq!(level.total_file_size, 0);
2755        assert_eq!(level.uncompressed_file_size, 0);
2756    }
2757
2758    #[test]
2759    fn test_level_delete_ssts() {
2760        let mut level = Level {
2761            level_idx: 1,
2762            level_type: LevelType::Nonoverlapping,
2763            table_infos: vec![
2764                make_sst(1, vec![1], 100),
2765                make_sst(2, vec![2], 200),
2766                make_sst(3, vec![3], 300),
2767            ],
2768            total_file_size: 600,
2769            uncompressed_file_size: 1200,
2770            ..Default::default()
2771        };
2772
2773        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2774        let changed = level.delete_ssts(&delete_ids);
2775
2776        assert!(changed);
2777        assert_eq!(level.table_infos.len(), 2);
2778        assert_eq!(1, level.table_infos[0].sst_id);
2779        assert_eq!(3, level.table_infos[1].sst_id);
2780        assert_eq!(level.total_file_size, 400);
2781        assert_eq!(level.uncompressed_file_size, 800);
2782
2783        // Delete non-existent id → no change, returns false.
2784        let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2785        let changed = level.delete_ssts(&delete_ids);
2786        assert!(!changed);
2787        assert_eq!(level.table_infos.len(), 2);
2788    }
2789
2790    #[test]
2791    fn test_level_truncate_tables() {
2792        let mut level = Level {
2793            level_idx: 1,
2794            level_type: LevelType::Nonoverlapping,
2795            table_infos: vec![
2796                make_sst(1, vec![1, 2], 100),
2797                make_sst(2, vec![2, 3], 200),
2798                make_sst(3, vec![2], 300),
2799            ],
2800            total_file_size: 600,
2801            uncompressed_file_size: 1200,
2802            ..Default::default()
2803        };
2804
2805        let truncate_ids = HashSet::from([TableId::new(2)]);
2806        level.truncate_tables(&truncate_ids);
2807
2808        // SST 3 should be removed (was only table_id=2)
2809        assert_eq!(level.table_infos.len(), 2);
2810        assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2811        assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2812        assert_eq!(level.total_file_size, 100 + 200);
2813        assert_eq!(level.uncompressed_file_size, 200 + 400);
2814
2815        // Truncate remaining tables → all SSTs removed.
2816        let truncate_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2817        level.truncate_tables(&truncate_ids);
2818        assert!(level.table_infos.is_empty());
2819        assert_eq!(level.total_file_size, 0);
2820        assert_eq!(level.uncompressed_file_size, 0);
2821    }
2822
2823    #[test]
2824    fn test_overlapping_level_normalize() {
2825        let mut l0 = OverlappingLevel {
2826            sub_levels: vec![
2827                Level {
2828                    level_idx: 0,
2829                    table_infos: vec![make_sst(1, vec![1], 100)],
2830                    total_file_size: 100,
2831                    uncompressed_file_size: 200,
2832                    sub_level_id: 1,
2833                    ..Default::default()
2834                },
2835                Level {
2836                    level_idx: 0,
2837                    table_infos: vec![], // empty → should be removed
2838                    total_file_size: 0,
2839                    uncompressed_file_size: 0,
2840                    sub_level_id: 2,
2841                    ..Default::default()
2842                },
2843                Level {
2844                    level_idx: 0,
2845                    table_infos: vec![make_sst(3, vec![3], 300)],
2846                    total_file_size: 300,
2847                    uncompressed_file_size: 600,
2848                    sub_level_id: 3,
2849                    ..Default::default()
2850                },
2851            ],
2852            total_file_size: 9999, // intentionally wrong
2853            uncompressed_file_size: 9999,
2854        };
2855
2856        l0.normalize();
2857
2858        assert_eq!(l0.sub_levels.len(), 2);
2859        assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2860        assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2861        assert_eq!(l0.total_file_size, 100 + 300);
2862        assert_eq!(l0.uncompressed_file_size, 200 + 600);
2863
2864        // All sub-levels empty → normalize clears everything.
2865        l0.sub_levels = vec![Level {
2866            level_idx: 0,
2867            table_infos: vec![],
2868            ..Default::default()
2869        }];
2870        l0.normalize();
2871        assert!(l0.sub_levels.is_empty());
2872        assert_eq!(l0.total_file_size, 0);
2873        assert_eq!(l0.uncompressed_file_size, 0);
2874    }
2875
2876    #[test]
2877    fn test_levels_truncate_tables() {
2878        #[expect(deprecated)]
2879        let mut levels = Levels {
2880            l0: OverlappingLevel {
2881                sub_levels: vec![
2882                    Level {
2883                        level_idx: 0,
2884                        table_infos: vec![
2885                            make_sst(1, vec![10], 100), // table 10
2886                            make_sst(2, vec![20], 200), // table 20
2887                        ],
2888                        total_file_size: 300,
2889                        uncompressed_file_size: 600,
2890                        sub_level_id: 1,
2891                        ..Default::default()
2892                    },
2893                    Level {
2894                        level_idx: 0,
2895                        table_infos: vec![
2896                            make_sst(3, vec![10], 150), // table 10
2897                        ],
2898                        total_file_size: 150,
2899                        uncompressed_file_size: 300,
2900                        sub_level_id: 2,
2901                        ..Default::default()
2902                    },
2903                ],
2904                total_file_size: 450,
2905                uncompressed_file_size: 900,
2906            },
2907            levels: vec![Level {
2908                level_idx: 1,
2909                level_type: LevelType::Nonoverlapping,
2910                table_infos: vec![
2911                    make_sst(4, vec![10, 20], 400), // shared SST
2912                    make_sst(5, vec![10], 500),     // table 10 only
2913                ],
2914                total_file_size: 900,
2915                uncompressed_file_size: 1800,
2916                ..Default::default()
2917            }],
2918            group_id: 1.into(),
2919            parent_group_id: 0.into(),
2920            member_table_ids: vec![],
2921            compaction_group_version_id: 0,
2922        };
2923
2924        // Truncate table 10
2925        levels.truncate_tables(&HashSet::from([TableId::new(10)]));
2926
2927        assert_eq!(levels.l0.sub_levels.len(), 1);
2928        assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2929        assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2930        assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2931        assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2932        assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2933
2934        assert_eq!(levels.l0.total_file_size, 200);
2935        assert_eq!(levels.l0.uncompressed_file_size, 400);
2936
2937        assert_eq!(levels.levels[0].table_infos.len(), 1);
2938        assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2939        assert_eq!(
2940            levels.levels[0].table_infos[0].table_ids,
2941            vec![TableId::new(20)]
2942        );
2943        assert_eq!(levels.levels[0].total_file_size, 400);
2944        assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2945
2946        assert_eq!(levels.compaction_group_version_id, 1);
2947    }
2948
2949    #[test]
2950    fn test_apply_version_delta_truncate_tables() {
2951        let mut version = HummockVersion {
2952            id: HummockVersionId::new(0),
2953            levels: HashMap::from_iter([(1.into(), {
2954                #[expect(deprecated)]
2955                let levels = Levels {
2956                    l0: OverlappingLevel {
2957                        sub_levels: vec![
2958                            Level {
2959                                level_idx: 0,
2960                                level_type: LevelType::Overlapping,
2961                                table_infos: vec![
2962                                    make_sst(1, vec![100], 50), // only table 100
2963                                    make_sst(2, vec![200], 60), // only table 200
2964                                ],
2965                                total_file_size: 110,
2966                                uncompressed_file_size: 220,
2967                                sub_level_id: 1,
2968                                ..Default::default()
2969                            },
2970                            Level {
2971                                level_idx: 0,
2972                                level_type: LevelType::Overlapping,
2973                                table_infos: vec![
2974                                    make_sst(3, vec![100], 70), // only table 100
2975                                ],
2976                                total_file_size: 70,
2977                                uncompressed_file_size: 140,
2978                                sub_level_id: 2,
2979                                ..Default::default()
2980                            },
2981                        ],
2982                        total_file_size: 180,
2983                        uncompressed_file_size: 360,
2984                    },
2985                    levels: vec![Level {
2986                        level_idx: 1,
2987                        level_type: LevelType::Nonoverlapping,
2988                        table_infos: vec![
2989                            make_sst(4, vec![100, 200], 80), // shared
2990                            make_sst(5, vec![100], 90),      // only table 100
2991                        ],
2992                        total_file_size: 170,
2993                        uncompressed_file_size: 340,
2994                        ..Default::default()
2995                    }],
2996                    group_id: 1.into(),
2997                    parent_group_id: 0.into(),
2998                    member_table_ids: vec![],
2999                    compaction_group_version_id: 0,
3000                };
3001                levels
3002            })]),
3003            ..Default::default()
3004        };
3005
3006        let version_delta = HummockVersionDelta {
3007            id: HummockVersionId::new(1),
3008            group_deltas: HashMap::from_iter([(
3009                1.into(),
3010                GroupDeltas {
3011                    group_deltas: vec![GroupDelta::TruncateTables(HashSet::from([TableId::new(
3012                        100,
3013                    )]))],
3014                },
3015            )]),
3016            ..Default::default()
3017        };
3018
3019        version.apply_version_delta(&version_delta);
3020
3021        let cg = version.get_compaction_group_levels(1.into());
3022
3023        assert_eq!(
3024            cg.l0.sub_levels.len(),
3025            1,
3026            "empty sub-level should be removed"
3027        );
3028        assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3029        assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3030        assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3031        for sub_level in &cg.l0.sub_levels {
3032            for sst in &sub_level.table_infos {
3033                assert!(
3034                    !sst.table_ids.is_empty(),
3035                    "SST {} should not have empty table_ids",
3036                    sst.sst_id
3037                );
3038            }
3039        }
3040
3041        assert_eq!(cg.l0.total_file_size, 60);
3042        assert_eq!(cg.l0.uncompressed_file_size, 120);
3043
3044        assert_eq!(cg.levels[0].table_infos.len(), 1);
3045        assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3046        assert_eq!(
3047            cg.levels[0].table_infos[0].table_ids,
3048            vec![TableId::new(200)]
3049        );
3050        assert_eq!(cg.levels[0].total_file_size, 80);
3051        assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3052
3053        assert_eq!(cg.compaction_group_version_id, 1);
3054    }
3055
3056    #[test]
3057    fn test_apply_version_delta_compact_l0() {
3058        let mut version = HummockVersion {
3059            id: HummockVersionId::new(0),
3060            levels: HashMap::from_iter([(1.into(), {
3061                #[expect(deprecated)]
3062                let levels = Levels {
3063                    l0: OverlappingLevel {
3064                        sub_levels: vec![
3065                            Level {
3066                                level_idx: 0,
3067                                level_type: LevelType::Nonoverlapping,
3068                                table_infos: vec![
3069                                    make_sst(1, vec![1], 100),
3070                                    make_sst(2, vec![2], 200),
3071                                ],
3072                                total_file_size: 300,
3073                                uncompressed_file_size: 600,
3074                                sub_level_id: 1,
3075                                ..Default::default()
3076                            },
3077                            Level {
3078                                level_idx: 0,
3079                                level_type: LevelType::Nonoverlapping,
3080                                table_infos: vec![make_sst(3, vec![3], 300)],
3081                                total_file_size: 300,
3082                                uncompressed_file_size: 600,
3083                                sub_level_id: 2,
3084                                ..Default::default()
3085                            },
3086                        ],
3087                        total_file_size: 600,
3088                        uncompressed_file_size: 1200,
3089                    },
3090                    levels: vec![Level {
3091                        level_idx: 1,
3092                        level_type: LevelType::Nonoverlapping,
3093                        table_infos: vec![],
3094                        total_file_size: 0,
3095                        uncompressed_file_size: 0,
3096                        ..Default::default()
3097                    }],
3098                    group_id: 1.into(),
3099                    parent_group_id: 0.into(),
3100                    member_table_ids: vec![],
3101                    compaction_group_version_id: 0,
3102                };
3103                levels
3104            })]),
3105            ..Default::default()
3106        };
3107
3108        let version_delta = HummockVersionDelta {
3109            id: HummockVersionId::new(1),
3110            group_deltas: HashMap::from_iter([(
3111                1.into(),
3112                GroupDeltas {
3113                    group_deltas: vec![
3114                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3115                            0, // L0
3116                            0,
3117                            HashSet::from([1.into(), 2.into(), 3.into()]),
3118                            vec![],
3119                            0,
3120                            0,
3121                        )),
3122                        GroupDelta::IntraLevel(IntraLevelDelta::new(
3123                            1, // L1
3124                            0,
3125                            HashSet::new(),
3126                            vec![make_sst(10, vec![1, 2, 3], 500)],
3127                            0,
3128                            0,
3129                        )),
3130                    ],
3131                },
3132            )]),
3133            ..Default::default()
3134        };
3135
3136        version.apply_version_delta(&version_delta);
3137
3138        let cg = version.get_compaction_group_levels(1.into());
3139
3140        assert!(cg.l0.sub_levels.is_empty());
3141        assert_eq!(cg.l0.total_file_size, 0);
3142        assert_eq!(cg.l0.uncompressed_file_size, 0);
3143
3144        assert_eq!(cg.levels[0].table_infos.len(), 1);
3145        assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3146        assert_eq!(cg.levels[0].total_file_size, 500);
3147    }
3148}