risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51    pub insert_sst_level: u32,
52    pub insert_sst_infos: Vec<SstableInfo>,
53    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60        self.levels
61            .get(&compaction_group_id)
62            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63    }
64
65    pub fn get_compaction_group_levels_mut(
66        &mut self,
67        compaction_group_id: CompactionGroupId,
68    ) -> &mut Levels {
69        self.levels
70            .get_mut(&compaction_group_id)
71            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72    }
73
74    // only scan the sst infos from levels in the specified compaction group (without table change log)
75    pub fn get_sst_ids_by_group_id(
76        &self,
77        compaction_group_id: CompactionGroupId,
78    ) -> impl Iterator<Item = HummockSstableId> + '_ {
79        self.levels
80            .iter()
81            .filter_map(move |(cg_id, level)| {
82                if *cg_id == compaction_group_id {
83                    Some(level)
84                } else {
85                    None
86                }
87            })
88            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89            .flat_map(|level| level.table_infos.iter())
90            .map(|s| s.sst_id)
91    }
92
93    pub fn level_iter<F: FnMut(&Level) -> bool>(
94        &self,
95        compaction_group_id: CompactionGroupId,
96        mut f: F,
97    ) {
98        if let Some(levels) = self.levels.get(&compaction_group_id) {
99            for sub_level in &levels.l0.sub_levels {
100                if !f(sub_level) {
101                    return;
102                }
103            }
104            for level in &levels.levels {
105                if !f(level) {
106                    return;
107                }
108            }
109        }
110    }
111
112    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113        // l0 is currently separated from all levels
114        self.levels
115            .get(&compaction_group_id)
116            .map(|group| group.levels.len() + 1)
117            .unwrap_or(0)
118    }
119
120    pub fn safe_epoch_table_watermarks(
121        &self,
122        existing_table_ids: &[u32],
123    ) -> BTreeMap<u32, TableWatermarks> {
124        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125    }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130    existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132    fn extract_single_table_watermark(
133        table_watermarks: &TableWatermarks,
134    ) -> Option<TableWatermarks> {
135        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136            Some(TableWatermarks {
137                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138                direction: table_watermarks.direction,
139                watermark_type: table_watermarks.watermark_type,
140            })
141        } else {
142            None
143        }
144    }
145    table_watermarks
146        .iter()
147        .filter_map(|(table_id, table_watermarks)| {
148            let u32_table_id = table_id.table_id();
149            if !existing_table_ids.contains(&u32_table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (table_id.table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                TableId::from(table_id),
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level
296                    .table_infos
297                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298                    .for_each(|sst_info| {
299                        sub_level.total_file_size -= sst_info.sst_size;
300                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301                        l0.total_file_size -= sst_info.sst_size;
302                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303                    });
304                if insert_table_infos.is_empty() {
305                    continue;
306                }
307                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308                    Ok(idx) => {
309                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310                    }
311                    Err(idx) => {
312                        insert_new_sub_level(
313                            target_l0,
314                            sub_level.sub_level_id,
315                            sub_level.level_type,
316                            insert_table_infos,
317                            Some(idx),
318                        );
319                    }
320                }
321            }
322
323            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324        }
325        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326            let insert_table_infos =
327                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328            cur_levels.levels[idx].total_file_size += insert_table_infos
329                .iter()
330                .map(|sst| sst.sst_size)
331                .sum::<u64>();
332            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333                .iter()
334                .map(|sst| sst.uncompressed_file_size)
335                .sum::<u64>();
336            cur_levels.levels[idx]
337                .table_infos
338                .extend(insert_table_infos);
339            cur_levels.levels[idx]
340                .table_infos
341                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342            assert!(can_concat(&cur_levels.levels[idx].table_infos));
343            level
344                .table_infos
345                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346                .for_each(|sst_info| {
347                    level.total_file_size -= sst_info.sst_size;
348                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349                });
350        }
351
352        assert!(
353            parent_levels
354                .l0
355                .sub_levels
356                .iter()
357                .all(|level| !level.table_infos.is_empty())
358        );
359        assert!(
360            cur_levels
361                .l0
362                .sub_levels
363                .iter()
364                .all(|level| !level.table_infos.is_empty())
365        );
366    }
367
368    pub fn build_sst_delta_infos(
369        &self,
370        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371    ) -> Vec<SstDeltaInfo> {
372        let mut infos = vec![];
373
374        // Skip trivial move delta for refiller
375        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
376        if version_delta.trivial_move {
377            return infos;
378        }
379
380        for (group_id, group_deltas) in &version_delta.group_deltas {
381            let mut info = SstDeltaInfo::default();
382
383            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386            // Build only if all deltas are intra level deltas.
387            if !group_deltas.group_deltas.iter().all(|delta| {
388                matches!(
389                    delta,
390                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391                )
392            }) {
393                continue;
394            }
395
396            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
397            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
398            // struct to reduce conventions.
399            for group_delta in &group_deltas.group_deltas {
400                match group_delta {
401                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402                        if !inserted_table_infos.is_empty() {
403                            info.insert_sst_level = 0;
404                            info.insert_sst_infos
405                                .extend(inserted_table_infos.iter().cloned());
406                        }
407                    }
408                    GroupDeltaCommon::IntraLevel(intra_level) => {
409                        if !intra_level.inserted_table_infos.is_empty() {
410                            info.insert_sst_level = intra_level.level_idx;
411                            info.insert_sst_infos
412                                .extend(intra_level.inserted_table_infos.iter().cloned());
413                        }
414                        if !intra_level.removed_table_ids.is_empty() {
415                            for id in &intra_level.removed_table_ids {
416                                if intra_level.level_idx == 0 {
417                                    removed_l0_ssts.insert(*id);
418                                } else {
419                                    removed_ssts
420                                        .entry(intra_level.level_idx)
421                                        .or_default()
422                                        .insert(*id);
423                                }
424                            }
425                        }
426                    }
427                    GroupDeltaCommon::GroupConstruct(_)
428                    | GroupDeltaCommon::GroupDestroy(_)
429                    | GroupDeltaCommon::GroupMerge(_)
430                    | GroupDeltaCommon::TruncateTables(_) => {}
431                }
432            }
433
434            let group = self.levels.get(group_id).unwrap();
435            for l0_sub_level in &group.level0().sub_levels {
436                for sst_info in &l0_sub_level.table_infos {
437                    if removed_l0_ssts.remove(&sst_info.sst_id) {
438                        info.delete_sst_object_ids.push(sst_info.object_id);
439                    }
440                }
441            }
442            for level in &group.levels {
443                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
444                    for sst_info in &level.table_infos {
445                        if removed_level_ssts.remove(&sst_info.sst_id) {
446                            info.delete_sst_object_ids.push(sst_info.object_id);
447                        }
448                    }
449                    if !removed_level_ssts.is_empty() {
450                        tracing::error!(
451                            "removed_level_ssts is not empty: {:?}",
452                            removed_level_ssts,
453                        );
454                    }
455                    debug_assert!(removed_level_ssts.is_empty());
456                }
457            }
458
459            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
460                tracing::error!(
461                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
462                    removed_l0_ssts,
463                    removed_ssts
464                );
465            }
466            debug_assert!(removed_l0_ssts.is_empty());
467            debug_assert!(removed_ssts.is_empty());
468
469            infos.push(info);
470        }
471
472        infos
473    }
474
475    pub fn apply_version_delta(
476        &mut self,
477        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
478    ) {
479        assert_eq!(self.id, version_delta.prev_id);
480
481        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
482            &version_delta.state_table_info_delta,
483            &version_delta.removed_table_ids,
484        );
485
486        #[expect(deprecated)]
487        {
488            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
489                is_commit_epoch = true;
490                tracing::trace!(
491                    "max committed epoch bumped but no table committed epoch is changed"
492                );
493            }
494        }
495
496        // apply to `levels`, which is different compaction groups
497        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
498            let mut is_applied_l0_compact = false;
499            for group_delta in &group_deltas.group_deltas {
500                match group_delta {
501                    GroupDeltaCommon::GroupConstruct(group_construct) => {
502                        let mut new_levels = build_initial_compaction_group_levels(
503                            *compaction_group_id,
504                            group_construct.get_group_config().unwrap(),
505                        );
506                        let parent_group_id = group_construct.parent_group_id;
507                        new_levels.parent_group_id = parent_group_id;
508                        #[expect(deprecated)]
509                        // for backward-compatibility of previous hummock version delta
510                        new_levels
511                            .member_table_ids
512                            .clone_from(&group_construct.table_ids);
513                        self.levels.insert(*compaction_group_id, new_levels);
514                        let member_table_ids = if group_construct.version()
515                            >= CompatibilityVersion::NoMemberTableIds
516                        {
517                            self.state_table_info
518                                .compaction_group_member_table_ids(*compaction_group_id)
519                                .iter()
520                                .map(|table_id| table_id.table_id)
521                                .collect()
522                        } else {
523                            #[expect(deprecated)]
524                            // for backward-compatibility of previous hummock version delta
525                            BTreeSet::from_iter(group_construct.table_ids.clone())
526                        };
527
528                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
529                            let split_key = if group_construct.split_key.is_some() {
530                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
531                            } else {
532                                None
533                            };
534                            self.init_with_parent_group_v2(
535                                parent_group_id,
536                                *compaction_group_id,
537                                group_construct.get_new_sst_start_id().into(),
538                                split_key.clone(),
539                            );
540                        } else {
541                            // for backward-compatibility of previous hummock version delta
542                            self.init_with_parent_group(
543                                parent_group_id,
544                                *compaction_group_id,
545                                member_table_ids,
546                                group_construct.get_new_sst_start_id().into(),
547                            );
548                        }
549                    }
550                    GroupDeltaCommon::GroupMerge(group_merge) => {
551                        tracing::info!(
552                            "group_merge left {:?} right {:?}",
553                            group_merge.left_group_id,
554                            group_merge.right_group_id
555                        );
556                        self.merge_compaction_group(
557                            group_merge.left_group_id,
558                            group_merge.right_group_id,
559                        )
560                    }
561                    GroupDeltaCommon::IntraLevel(level_delta) => {
562                        let levels =
563                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
564                                panic!("compaction group {} does not exist", compaction_group_id)
565                            });
566                        if is_commit_epoch {
567                            assert!(
568                                level_delta.removed_table_ids.is_empty(),
569                                "no sst should be deleted when committing an epoch"
570                            );
571
572                            let IntraLevelDelta {
573                                level_idx,
574                                l0_sub_level_id,
575                                inserted_table_infos,
576                                ..
577                            } = level_delta;
578                            {
579                                assert_eq!(
580                                    *level_idx, 0,
581                                    "we should only add to L0 when we commit an epoch."
582                                );
583                                if !inserted_table_infos.is_empty() {
584                                    insert_new_sub_level(
585                                        &mut levels.l0,
586                                        *l0_sub_level_id,
587                                        PbLevelType::Overlapping,
588                                        inserted_table_infos.clone(),
589                                        None,
590                                    );
591                                }
592                            }
593                        } else {
594                            // The delta is caused by compaction.
595                            levels.apply_compact_ssts(
596                                level_delta,
597                                self.state_table_info
598                                    .compaction_group_member_table_ids(*compaction_group_id),
599                            );
600                            if level_delta.level_idx == 0 {
601                                is_applied_l0_compact = true;
602                            }
603                        }
604                    }
605                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
606                        let levels =
607                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
608                                panic!("compaction group {} does not exist", compaction_group_id)
609                            });
610                        assert!(is_commit_epoch);
611
612                        if !inserted_table_infos.is_empty() {
613                            let next_l0_sub_level_id = levels
614                                .l0
615                                .sub_levels
616                                .last()
617                                .map(|level| level.sub_level_id + 1)
618                                .unwrap_or(1);
619
620                            insert_new_sub_level(
621                                &mut levels.l0,
622                                next_l0_sub_level_id,
623                                PbLevelType::Overlapping,
624                                inserted_table_infos.clone(),
625                                None,
626                            );
627                        }
628                    }
629                    GroupDeltaCommon::GroupDestroy(_) => {
630                        self.levels.remove(compaction_group_id);
631                    }
632
633                    GroupDeltaCommon::TruncateTables(table_ids) => {
634                        // iterate all levels and truncate the specified tables with the given table ids
635                        let levels =
636                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
637                                panic!("compaction group {} does not exist", compaction_group_id)
638                            });
639
640                        for sub_level in &mut levels.l0.sub_levels {
641                            sub_level.table_infos.iter_mut().for_each(|sstable_info| {
642                                let mut inner = sstable_info.get_inner();
643                                inner.table_ids.retain(|id| !table_ids.contains(id));
644                                sstable_info.set_inner(inner);
645                            });
646                        }
647
648                        for level in &mut levels.levels {
649                            level.table_infos.iter_mut().for_each(|sstable_info| {
650                                let mut inner = sstable_info.get_inner();
651                                inner.table_ids.retain(|id| !table_ids.contains(id));
652                                sstable_info.set_inner(inner);
653                            });
654                        }
655
656                        levels.compaction_group_version_id += 1;
657                    }
658                }
659            }
660
661            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
662            {
663                levels.post_apply_l0_compact();
664            }
665        }
666        self.id = version_delta.id;
667        #[expect(deprecated)]
668        {
669            self.max_committed_epoch = version_delta.max_committed_epoch;
670        }
671
672        // apply to table watermark
673
674        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
675        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
676            HashMap::new();
677
678        // apply to table watermark
679        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
680            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
681                if version_delta.removed_table_ids.contains(table_id) {
682                    modified_table_watermarks.insert(*table_id, None);
683                } else {
684                    let mut current_table_watermarks = (**current_table_watermarks).clone();
685                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
686                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
687                }
688            } else {
689                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
690            }
691        }
692        for (table_id, table_watermarks) in &self.table_watermarks {
693            let safe_epoch = if let Some(state_table_info) =
694                self.state_table_info.info().get(table_id)
695                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
696                && state_table_info.committed_epoch > *oldest_epoch
697            {
698                // safe epoch has progressed, need further clear.
699                state_table_info.committed_epoch
700            } else {
701                // safe epoch not progressed or the table has been removed. No need to truncate
702                continue;
703            };
704            let table_watermarks = modified_table_watermarks
705                .entry(*table_id)
706                .or_insert_with(|| Some((**table_watermarks).clone()));
707            if let Some(table_watermarks) = table_watermarks {
708                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
709            }
710        }
711        // apply the staging table watermark to hummock version
712        for (table_id, table_watermarks) in modified_table_watermarks {
713            if let Some(table_watermarks) = table_watermarks {
714                self.table_watermarks
715                    .insert(table_id, Arc::new(table_watermarks));
716            } else {
717                self.table_watermarks.remove(&table_id);
718            }
719        }
720
721        // apply to table change log
722        Self::apply_change_log_delta(
723            &mut self.table_change_log,
724            &version_delta.change_log_delta,
725            &version_delta.removed_table_ids,
726            &version_delta.state_table_info_delta,
727            &changed_table_info,
728        );
729
730        // apply to vector index
731        apply_vector_index_delta(
732            &mut self.vector_indexes,
733            &version_delta.vector_index_delta,
734            &version_delta.removed_table_ids,
735        );
736    }
737
738    pub fn apply_change_log_delta<T: Clone>(
739        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
740        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
741        removed_table_ids: &HashSet<TableId>,
742        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
743        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
744    ) {
745        for (table_id, change_log_delta) in change_log_delta {
746            let new_change_log = &change_log_delta.new_log;
747            match table_change_log.entry(*table_id) {
748                Entry::Occupied(entry) => {
749                    let change_log = entry.into_mut();
750                    change_log.add_change_log(new_change_log.clone());
751                }
752                Entry::Vacant(entry) => {
753                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
754                }
755            };
756        }
757
758        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
759        // the change log for the table, and then we will remove the table change log.
760        // The table change log will also be removed when the table id is removed.
761        table_change_log.retain(|table_id, _| {
762            if removed_table_ids.contains(table_id) {
763                return false;
764            }
765            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
766                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
767                // the table exists previously, and its committed epoch has progressed.
768            } else {
769                // otherwise, the table change log should be kept anyway
770                return true;
771            }
772            let contains = change_log_delta.contains_key(table_id);
773            if !contains {
774                warn!(
775                        ?table_id,
776                        "table change log dropped due to no further change log at newly committed epoch",
777                    );
778            }
779            contains
780        });
781
782        // truncate the remaining table change log
783        for (table_id, change_log_delta) in change_log_delta {
784            if let Some(change_log) = table_change_log.get_mut(table_id) {
785                change_log.truncate(change_log_delta.truncate_epoch);
786            }
787        }
788    }
789
790    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
791        let mut ret: BTreeMap<_, _> = BTreeMap::new();
792        for (compaction_group_id, group) in &self.levels {
793            let mut levels = vec![];
794            levels.extend(group.l0.sub_levels.iter());
795            levels.extend(group.levels.iter());
796            for level in levels {
797                for table_info in &level.table_infos {
798                    if table_info.sst_id.inner() == table_info.object_id.inner() {
799                        continue;
800                    }
801                    let object_id = table_info.object_id;
802                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
803                    entry
804                        .entry(*compaction_group_id)
805                        .or_default()
806                        .push(table_info.sst_id)
807                }
808            }
809        }
810        ret
811    }
812
813    pub fn merge_compaction_group(
814        &mut self,
815        left_group_id: CompactionGroupId,
816        right_group_id: CompactionGroupId,
817    ) {
818        // Double check
819        let left_group_id_table_ids = self
820            .state_table_info
821            .compaction_group_member_table_ids(left_group_id)
822            .iter()
823            .map(|table_id| table_id.table_id);
824        let right_group_id_table_ids = self
825            .state_table_info
826            .compaction_group_member_table_ids(right_group_id)
827            .iter()
828            .map(|table_id| table_id.table_id);
829
830        assert!(
831            left_group_id_table_ids
832                .chain(right_group_id_table_ids)
833                .is_sorted()
834        );
835
836        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
837        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
838            panic!(
839                "compaction group should exist right {} all {:?}",
840                right_group_id, total_cg
841            )
842        });
843
844        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
845            panic!(
846                "compaction group should exist left {} all {:?}",
847                left_group_id, total_cg
848            )
849        });
850
851        group_split::merge_levels(left_levels, right_levels);
852    }
853
854    pub fn init_with_parent_group_v2(
855        &mut self,
856        parent_group_id: CompactionGroupId,
857        group_id: CompactionGroupId,
858        new_sst_start_id: HummockSstableId,
859        split_key: Option<Bytes>,
860    ) {
861        let mut new_sst_id = new_sst_start_id;
862        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
863            if new_sst_start_id != 0 {
864                if cfg!(debug_assertions) {
865                    panic!(
866                        "non-zero sst start id {} for NewCompactionGroup",
867                        new_sst_start_id
868                    );
869                } else {
870                    warn!(
871                        %new_sst_start_id,
872                        "non-zero sst start id for NewCompactionGroup"
873                    );
874                }
875            }
876            return;
877        } else if !self.levels.contains_key(&parent_group_id) {
878            unreachable!(
879                "non-existing parent group id {} to init from (V2)",
880                parent_group_id
881            );
882        }
883
884        let [parent_levels, cur_levels] = self
885            .levels
886            .get_disjoint_mut([&parent_group_id, &group_id])
887            .map(|res| res.unwrap());
888        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
889        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
890        parent_levels.compaction_group_version_id += 1;
891        cur_levels.compaction_group_version_id += 1;
892
893        let l0 = &mut parent_levels.l0;
894        {
895            for sub_level in &mut l0.sub_levels {
896                let target_l0 = &mut cur_levels.l0;
897                // Remove SST from sub level may result in empty sub level. It will be purged
898                // whenever another compaction task is finished.
899                let insert_table_infos = if let Some(split_key) = &split_key {
900                    group_split::split_sst_info_for_level_v2(
901                        sub_level,
902                        &mut new_sst_id,
903                        split_key.clone(),
904                    )
905                } else {
906                    vec![]
907                };
908
909                if insert_table_infos.is_empty() {
910                    continue;
911                }
912
913                sub_level
914                    .table_infos
915                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
916                    .for_each(|sst_info| {
917                        sub_level.total_file_size -= sst_info.sst_size;
918                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
919                        l0.total_file_size -= sst_info.sst_size;
920                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
921                    });
922                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
923                    Ok(idx) => {
924                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
925                    }
926                    Err(idx) => {
927                        insert_new_sub_level(
928                            target_l0,
929                            sub_level.sub_level_id,
930                            sub_level.level_type,
931                            insert_table_infos,
932                            Some(idx),
933                        );
934                    }
935                }
936            }
937
938            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
939        }
940
941        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
942            let insert_table_infos = if let Some(split_key) = &split_key {
943                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
944            } else {
945                vec![]
946            };
947
948            if insert_table_infos.is_empty() {
949                continue;
950            }
951
952            cur_levels.levels[idx].total_file_size += insert_table_infos
953                .iter()
954                .map(|sst| sst.sst_size)
955                .sum::<u64>();
956            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
957                .iter()
958                .map(|sst| sst.uncompressed_file_size)
959                .sum::<u64>();
960            cur_levels.levels[idx]
961                .table_infos
962                .extend(insert_table_infos);
963            cur_levels.levels[idx]
964                .table_infos
965                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
966            assert!(can_concat(&cur_levels.levels[idx].table_infos));
967            level
968                .table_infos
969                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
970                .for_each(|sst_info| {
971                    level.total_file_size -= sst_info.sst_size;
972                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
973                });
974        }
975
976        assert!(
977            parent_levels
978                .l0
979                .sub_levels
980                .iter()
981                .all(|level| !level.table_infos.is_empty())
982        );
983        assert!(
984            cur_levels
985                .l0
986                .sub_levels
987                .iter()
988                .all(|level| !level.table_infos.is_empty())
989        );
990    }
991}
992
993impl<T> HummockVersionCommon<T>
994where
995    T: SstableIdReader + ObjectIdReader,
996{
997    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
998        self.get_sst_infos(exclude_change_log)
999            .map(|s| s.object_id())
1000            .collect()
1001    }
1002
1003    pub fn get_object_ids(
1004        &self,
1005        exclude_change_log: bool,
1006    ) -> impl Iterator<Item = HummockObjectId> + '_ {
1007        // DO NOT REMOVE THIS LINE
1008        // This is to ensure that when adding new variant to `HummockObjectId`,
1009        // the compiler will warn us if we forget to handle it here.
1010        match HummockObjectId::Sstable(0.into()) {
1011            HummockObjectId::Sstable(_) => {}
1012            HummockObjectId::VectorFile(_) => {}
1013            HummockObjectId::HnswGraphFile(_) => {}
1014        };
1015        self.get_sst_infos(exclude_change_log)
1016            .map(|s| HummockObjectId::Sstable(s.object_id()))
1017            .chain(
1018                self.vector_indexes
1019                    .values()
1020                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1021            )
1022    }
1023
1024    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1025        self.get_sst_infos(exclude_change_log)
1026            .map(|s| s.sst_id())
1027            .collect()
1028    }
1029
1030    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1031        let may_table_change_log = if exclude_change_log {
1032            None
1033        } else {
1034            Some(self.table_change_log.values())
1035        };
1036        self.get_combined_levels()
1037            .flat_map(|level| level.table_infos.iter())
1038            .chain(
1039                may_table_change_log
1040                    .map(|v| {
1041                        v.flat_map(|table_change_log| {
1042                            table_change_log.iter().flat_map(|epoch_change_log| {
1043                                epoch_change_log
1044                                    .old_value
1045                                    .iter()
1046                                    .chain(epoch_change_log.new_value.iter())
1047                            })
1048                        })
1049                    })
1050                    .into_iter()
1051                    .flatten(),
1052            )
1053    }
1054}
1055
1056impl Levels {
1057    pub(crate) fn apply_compact_ssts(
1058        &mut self,
1059        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1060        member_table_ids: &BTreeSet<TableId>,
1061    ) {
1062        let IntraLevelDeltaCommon {
1063            level_idx,
1064            l0_sub_level_id,
1065            inserted_table_infos: insert_table_infos,
1066            vnode_partition_count,
1067            removed_table_ids: delete_sst_ids_set,
1068            compaction_group_version_id,
1069        } = level_delta;
1070        let new_vnode_partition_count = *vnode_partition_count;
1071
1072        if is_compaction_task_expired(
1073            self.compaction_group_version_id,
1074            *compaction_group_version_id,
1075        ) {
1076            warn!(
1077                current_compaction_group_version_id = self.compaction_group_version_id,
1078                delta_compaction_group_version_id = compaction_group_version_id,
1079                level_idx,
1080                l0_sub_level_id,
1081                insert_table_infos = ?insert_table_infos
1082                    .iter()
1083                    .map(|sst| (sst.sst_id, sst.object_id))
1084                    .collect_vec(),
1085                ?delete_sst_ids_set,
1086                "This VersionDelta may be committed by an expired compact task. Please check it."
1087            );
1088            return;
1089        }
1090        if !delete_sst_ids_set.is_empty() {
1091            if *level_idx == 0 {
1092                for level in &mut self.l0.sub_levels {
1093                    level_delete_ssts(level, delete_sst_ids_set);
1094                }
1095            } else {
1096                let idx = *level_idx as usize - 1;
1097                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1098            }
1099        }
1100
1101        if !insert_table_infos.is_empty() {
1102            let insert_sst_level_id = *level_idx;
1103            let insert_sub_level_id = *l0_sub_level_id;
1104            if insert_sst_level_id == 0 {
1105                let l0 = &mut self.l0;
1106                let index = l0
1107                    .sub_levels
1108                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1109                assert!(
1110                    index < l0.sub_levels.len()
1111                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1112                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1113                    insert_sub_level_id,
1114                    delete_sst_ids_set,
1115                    l0.sub_levels
1116                        .iter()
1117                        .map(|level| level.sub_level_id)
1118                        .collect_vec()
1119                );
1120                if l0.sub_levels[index].table_infos.is_empty()
1121                    && member_table_ids.len() == 1
1122                    && insert_table_infos.iter().all(|sst| {
1123                        sst.table_ids.len() == 1
1124                            && sst.table_ids[0]
1125                                == member_table_ids.iter().next().expect("non-empty").table_id
1126                    })
1127                {
1128                    // Only change vnode_partition_count for group which has only one state-table.
1129                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1130                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1131                }
1132                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1133            } else {
1134                let idx = insert_sst_level_id as usize - 1;
1135                if self.levels[idx].table_infos.is_empty()
1136                    && insert_table_infos
1137                        .iter()
1138                        .all(|sst| sst.table_ids.len() == 1)
1139                {
1140                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1141                } else if self.levels[idx].vnode_partition_count != 0
1142                    && new_vnode_partition_count == 0
1143                    && member_table_ids.len() > 1
1144                {
1145                    self.levels[idx].vnode_partition_count = 0;
1146                }
1147                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1148            }
1149        }
1150    }
1151
1152    pub(crate) fn post_apply_l0_compact(&mut self) {
1153        {
1154            self.l0
1155                .sub_levels
1156                .retain(|level| !level.table_infos.is_empty());
1157            self.l0.total_file_size = self
1158                .l0
1159                .sub_levels
1160                .iter()
1161                .map(|level| level.total_file_size)
1162                .sum::<u64>();
1163            self.l0.uncompressed_file_size = self
1164                .l0
1165                .sub_levels
1166                .iter()
1167                .map(|level| level.uncompressed_file_size)
1168                .sum::<u64>();
1169        }
1170    }
1171}
1172
1173impl<T, L> HummockVersionCommon<T, L> {
1174    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1175        self.levels
1176            .values()
1177            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1178    }
1179}
1180
1181pub fn build_initial_compaction_group_levels(
1182    group_id: CompactionGroupId,
1183    compaction_config: &CompactionConfig,
1184) -> Levels {
1185    let mut levels = vec![];
1186    for l in 0..compaction_config.get_max_level() {
1187        levels.push(Level {
1188            level_idx: (l + 1) as u32,
1189            level_type: PbLevelType::Nonoverlapping,
1190            table_infos: vec![],
1191            total_file_size: 0,
1192            sub_level_id: 0,
1193            uncompressed_file_size: 0,
1194            vnode_partition_count: 0,
1195        });
1196    }
1197    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1198    Levels {
1199        levels,
1200        l0: OverlappingLevel {
1201            sub_levels: vec![],
1202            total_file_size: 0,
1203            uncompressed_file_size: 0,
1204        },
1205        group_id,
1206        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1207        member_table_ids: vec![],
1208        compaction_group_version_id: 0,
1209    }
1210}
1211
1212fn split_sst_info_for_level(
1213    member_table_ids: &BTreeSet<u32>,
1214    level: &mut Level,
1215    new_sst_id: &mut HummockSstableId,
1216) -> Vec<SstableInfo> {
1217    // Remove SST from sub level may result in empty sub level. It will be purged
1218    // whenever another compaction task is finished.
1219    let mut insert_table_infos = vec![];
1220    for sst_info in &mut level.table_infos {
1221        let removed_table_ids = sst_info
1222            .table_ids
1223            .iter()
1224            .filter(|table_id| member_table_ids.contains(table_id))
1225            .cloned()
1226            .collect_vec();
1227        let sst_size = sst_info.sst_size;
1228        if sst_size / 2 == 0 {
1229            tracing::warn!(
1230                id = %sst_info.sst_id,
1231                object_id = %sst_info.object_id,
1232                sst_size = sst_info.sst_size,
1233                file_size = sst_info.file_size,
1234                "Sstable sst_size is under expected",
1235            );
1236        };
1237        if !removed_table_ids.is_empty() {
1238            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1239                sst_info,
1240                new_sst_id,
1241                sst_size / 2,
1242                sst_size / 2,
1243                member_table_ids.iter().cloned().collect_vec(),
1244            );
1245            *sst_info = modified_sst;
1246            insert_table_infos.push(branch_sst);
1247        }
1248    }
1249    insert_table_infos
1250}
1251
1252/// Gets all compaction group ids.
1253pub fn get_compaction_group_ids(
1254    version: &HummockVersion,
1255) -> impl Iterator<Item = CompactionGroupId> + '_ {
1256    version.levels.keys().cloned()
1257}
1258
1259pub fn get_table_compaction_group_id_mapping(
1260    version: &HummockVersion,
1261) -> HashMap<StateTableId, CompactionGroupId> {
1262    version
1263        .state_table_info
1264        .info()
1265        .iter()
1266        .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1267        .collect()
1268}
1269
1270/// Gets all SSTs in `group_id`
1271pub fn get_compaction_group_ssts(
1272    version: &HummockVersion,
1273    group_id: CompactionGroupId,
1274) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1275    let group_levels = version.get_compaction_group_levels(group_id);
1276    group_levels
1277        .l0
1278        .sub_levels
1279        .iter()
1280        .rev()
1281        .chain(group_levels.levels.iter())
1282        .flat_map(|level| {
1283            level
1284                .table_infos
1285                .iter()
1286                .map(|table_info| (table_info.object_id, table_info.sst_id))
1287        })
1288}
1289
1290pub fn new_sub_level(
1291    sub_level_id: u64,
1292    level_type: PbLevelType,
1293    table_infos: Vec<SstableInfo>,
1294) -> Level {
1295    if level_type == PbLevelType::Nonoverlapping {
1296        debug_assert!(
1297            can_concat(&table_infos),
1298            "sst of non-overlapping level is not concat-able: {:?}",
1299            table_infos
1300        );
1301    }
1302    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1303    let uncompressed_file_size = table_infos
1304        .iter()
1305        .map(|table| table.uncompressed_file_size)
1306        .sum();
1307    Level {
1308        level_idx: 0,
1309        level_type,
1310        table_infos,
1311        total_file_size,
1312        sub_level_id,
1313        uncompressed_file_size,
1314        vnode_partition_count: 0,
1315    }
1316}
1317
1318pub fn add_ssts_to_sub_level(
1319    l0: &mut OverlappingLevel,
1320    sub_level_idx: usize,
1321    insert_table_infos: Vec<SstableInfo>,
1322) {
1323    insert_table_infos.iter().for_each(|sst| {
1324        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1325        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1326        l0.total_file_size += sst.sst_size;
1327        l0.uncompressed_file_size += sst.uncompressed_file_size;
1328    });
1329    l0.sub_levels[sub_level_idx]
1330        .table_infos
1331        .extend(insert_table_infos);
1332    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1333        l0.sub_levels[sub_level_idx]
1334            .table_infos
1335            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1336        assert!(
1337            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1338            "sstable ids: {:?}",
1339            l0.sub_levels[sub_level_idx]
1340                .table_infos
1341                .iter()
1342                .map(|sst| sst.sst_id)
1343                .collect_vec()
1344        );
1345    }
1346}
1347
1348/// `None` value of `sub_level_insert_hint` means append.
1349pub fn insert_new_sub_level(
1350    l0: &mut OverlappingLevel,
1351    insert_sub_level_id: u64,
1352    level_type: PbLevelType,
1353    insert_table_infos: Vec<SstableInfo>,
1354    sub_level_insert_hint: Option<usize>,
1355) {
1356    if insert_sub_level_id == u64::MAX {
1357        return;
1358    }
1359    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1360        insert_pos
1361    } else {
1362        if let Some(newest_level) = l0.sub_levels.last() {
1363            assert!(
1364                newest_level.sub_level_id < insert_sub_level_id,
1365                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1366                newest_level.sub_level_id,
1367                insert_sub_level_id,
1368                l0,
1369            );
1370        }
1371        l0.sub_levels.len()
1372    };
1373    #[cfg(debug_assertions)]
1374    {
1375        if insert_pos > 0
1376            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1377        {
1378            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1379        }
1380        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1381            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1382        }
1383    }
1384    // All files will be committed in one new Overlapping sub-level and become
1385    // Nonoverlapping  after at least one compaction.
1386    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1387    l0.total_file_size += level.total_file_size;
1388    l0.uncompressed_file_size += level.uncompressed_file_size;
1389    l0.sub_levels.insert(insert_pos, level);
1390}
1391
1392/// Delete sstables if the table id is in the id set.
1393///
1394/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1395fn level_delete_ssts(
1396    operand: &mut Level,
1397    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1398) -> bool {
1399    let original_len = operand.table_infos.len();
1400    operand
1401        .table_infos
1402        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1403    operand.total_file_size = operand
1404        .table_infos
1405        .iter()
1406        .map(|table| table.sst_size)
1407        .sum::<u64>();
1408    operand.uncompressed_file_size = operand
1409        .table_infos
1410        .iter()
1411        .map(|table| table.uncompressed_file_size)
1412        .sum::<u64>();
1413    original_len != operand.table_infos.len()
1414}
1415
1416fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1417    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1418        format!(
1419            "sstable ids: {:?}",
1420            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1421        )
1422    }
1423    operand.total_file_size += insert_table_infos
1424        .iter()
1425        .map(|sst| sst.sst_size)
1426        .sum::<u64>();
1427    operand.uncompressed_file_size += insert_table_infos
1428        .iter()
1429        .map(|sst| sst.uncompressed_file_size)
1430        .sum::<u64>();
1431    if operand.level_type == PbLevelType::Overlapping {
1432        operand.level_type = PbLevelType::Nonoverlapping;
1433        operand
1434            .table_infos
1435            .extend(insert_table_infos.iter().cloned());
1436        operand
1437            .table_infos
1438            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1439        assert!(
1440            can_concat(&operand.table_infos),
1441            "{}",
1442            display_sstable_infos(&operand.table_infos)
1443        );
1444    } else if !insert_table_infos.is_empty() {
1445        let sorted_insert: Vec<_> = insert_table_infos
1446            .iter()
1447            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1448            .cloned()
1449            .collect();
1450        let first = &sorted_insert[0];
1451        let last = &sorted_insert[sorted_insert.len() - 1];
1452        let pos = operand
1453            .table_infos
1454            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1455        if pos >= operand.table_infos.len()
1456            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1457        {
1458            operand.table_infos.splice(pos..pos, sorted_insert);
1459            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1460            let validate_range = operand
1461                .table_infos
1462                .iter()
1463                .skip(pos.saturating_sub(1))
1464                .take(insert_table_infos.len() + 2)
1465                .collect_vec();
1466            assert!(
1467                can_concat(&validate_range),
1468                "{}",
1469                display_sstable_infos(&validate_range),
1470            );
1471        } else {
1472            // If this branch is reached, it indicates some unexpected behavior in compaction.
1473            // Here we issue a warning and fall back to insert one by one.
1474            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1475            for i in insert_table_infos {
1476                let pos = operand
1477                    .table_infos
1478                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1479                operand.table_infos.insert(pos, i.clone());
1480            }
1481            assert!(
1482                can_concat(&operand.table_infos),
1483                "{}",
1484                display_sstable_infos(&operand.table_infos)
1485            );
1486        }
1487    }
1488}
1489
1490pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1491    // DO NOT REMOVE THIS LINE
1492    // This is to ensure that when adding new variant to `HummockObjectId`,
1493    // the compiler will warn us if we forget to handle it here.
1494    match HummockObjectId::Sstable(0.into()) {
1495        HummockObjectId::Sstable(_) => {}
1496        HummockObjectId::VectorFile(_) => {}
1497        HummockObjectId::HnswGraphFile(_) => {}
1498    };
1499    version
1500        .levels
1501        .values()
1502        .flat_map(|cg| {
1503            cg.level0()
1504                .sub_levels
1505                .iter()
1506                .chain(cg.levels.iter())
1507                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1508        })
1509        .chain(version.table_change_log.values().flat_map(|c| {
1510            c.iter().flat_map(|l| {
1511                l.old_value
1512                    .iter()
1513                    .chain(l.new_value.iter())
1514                    .map(|t| (t.object_id, t.file_size))
1515            })
1516        }))
1517        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1518        .chain(
1519            version
1520                .vector_indexes
1521                .values()
1522                .flat_map(|index| index.get_objects()),
1523        )
1524        .collect()
1525}
1526
1527/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1528/// Currently this method is only used by risectl validate-version.
1529pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1530    let mut res = Vec::new();
1531    // Ensure each table maps to only one compaction group
1532    for (group_id, levels) in &version.levels {
1533        // Ensure compaction group id matches
1534        if levels.group_id != *group_id {
1535            res.push(format!(
1536                "GROUP {}: inconsistent group id {} in Levels",
1537                group_id, levels.group_id
1538            ));
1539        }
1540
1541        let validate_level = |group: CompactionGroupId,
1542                              expected_level_idx: u32,
1543                              level: &Level,
1544                              res: &mut Vec<String>| {
1545            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1546            if level.level_idx == 0 {
1547                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1548                // Ensure sub-level is not empty
1549                if level.table_infos.is_empty() {
1550                    res.push(format!("{}: empty level", level_identifier));
1551                }
1552            } else if level.level_type != PbLevelType::Nonoverlapping {
1553                // Ensure non-L0 level is non-overlapping level
1554                res.push(format!(
1555                    "{}: level type {:?} is not non-overlapping",
1556                    level_identifier, level.level_type
1557                ));
1558            }
1559
1560            // Ensure level idx matches
1561            if level.level_idx != expected_level_idx {
1562                res.push(format!(
1563                    "{}: mismatched level idx {}",
1564                    level_identifier, expected_level_idx
1565                ));
1566            }
1567
1568            let mut prev_table_info: Option<&SstableInfo> = None;
1569            for table_info in &level.table_infos {
1570                // Ensure table_ids are sorted and unique
1571                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1572                    res.push(format!(
1573                        "{} SST {}: table_ids not sorted",
1574                        level_identifier, table_info.object_id
1575                    ));
1576                }
1577
1578                // Ensure SSTs in non-overlapping level have non-overlapping key range
1579                if level.level_type == PbLevelType::Nonoverlapping {
1580                    if let Some(prev) = prev_table_info.take()
1581                        && prev
1582                            .key_range
1583                            .compare_right_with(&table_info.key_range.left)
1584                            != Ordering::Less
1585                    {
1586                        res.push(format!(
1587                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1588                            level_identifier, table_info.object_id, prev, table_info
1589                        ));
1590                    }
1591                    let _ = prev_table_info.insert(table_info);
1592                }
1593            }
1594        };
1595
1596        let l0 = &levels.l0;
1597        let mut prev_sub_level_id = u64::MAX;
1598        for sub_level in &l0.sub_levels {
1599            // Ensure sub_level_id is sorted and unique
1600            if sub_level.sub_level_id >= prev_sub_level_id {
1601                res.push(format!(
1602                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1603                    group_id, sub_level.level_idx, prev_sub_level_id
1604                ));
1605            }
1606            prev_sub_level_id = sub_level.sub_level_id;
1607
1608            validate_level(*group_id, 0, sub_level, &mut res);
1609        }
1610
1611        for idx in 1..=levels.levels.len() {
1612            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1613        }
1614    }
1615    res
1616}
1617
1618#[cfg(test)]
1619mod tests {
1620    use std::collections::{HashMap, HashSet};
1621
1622    use bytes::Bytes;
1623    use risingwave_common::catalog::TableId;
1624    use risingwave_common::hash::VirtualNode;
1625    use risingwave_common::util::epoch::test_epoch;
1626    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1627
1628    use super::group_split;
1629    use crate::HummockVersionId;
1630    use crate::compaction_group::group_split::*;
1631    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1632    use crate::key::{FullKey, gen_key_from_str};
1633    use crate::key_range::KeyRange;
1634    use crate::level::{Level, Levels, OverlappingLevel};
1635    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1636    use crate::version::{
1637        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1638    };
1639
1640    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1641        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1642    }
1643
1644    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1645        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1646        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1647        let full_key_l = FullKey::for_test(
1648            TableId::new(*table_ids.first().unwrap()),
1649            table_key_l,
1650            epoch,
1651        )
1652        .encode();
1653        let full_key_r =
1654            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1655                .encode();
1656
1657        SstableInfoInner {
1658            sst_id: sst_id.into(),
1659            key_range: KeyRange {
1660                left: full_key_l.into(),
1661                right: full_key_r.into(),
1662                right_exclusive: false,
1663            },
1664            table_ids,
1665            object_id: sst_id.into(),
1666            min_epoch: 20,
1667            max_epoch: 20,
1668            file_size: 100,
1669            sst_size: 100,
1670            ..Default::default()
1671        }
1672    }
1673
1674    #[test]
1675    fn test_get_sst_object_ids() {
1676        let mut version = HummockVersion {
1677            id: HummockVersionId::new(0),
1678            levels: HashMap::from_iter([(
1679                0,
1680                Levels {
1681                    levels: vec![],
1682                    l0: OverlappingLevel {
1683                        sub_levels: vec![],
1684                        total_file_size: 0,
1685                        uncompressed_file_size: 0,
1686                    },
1687                    ..Default::default()
1688                },
1689            )]),
1690            ..Default::default()
1691        };
1692        assert_eq!(version.get_object_ids(false).count(), 0);
1693
1694        // Add to sub level
1695        version
1696            .levels
1697            .get_mut(&0)
1698            .unwrap()
1699            .l0
1700            .sub_levels
1701            .push(Level {
1702                table_infos: vec![
1703                    SstableInfoInner {
1704                        object_id: 11.into(),
1705                        sst_id: 11.into(),
1706                        ..Default::default()
1707                    }
1708                    .into(),
1709                ],
1710                ..Default::default()
1711            });
1712        assert_eq!(version.get_object_ids(false).count(), 1);
1713
1714        // Add to non sub level
1715        version.levels.get_mut(&0).unwrap().levels.push(Level {
1716            table_infos: vec![
1717                SstableInfoInner {
1718                    object_id: 22.into(),
1719                    sst_id: 22.into(),
1720                    ..Default::default()
1721                }
1722                .into(),
1723            ],
1724            ..Default::default()
1725        });
1726        assert_eq!(version.get_object_ids(false).count(), 2);
1727    }
1728
1729    #[test]
1730    fn test_apply_version_delta() {
1731        let mut version = HummockVersion {
1732            id: HummockVersionId::new(0),
1733            levels: HashMap::from_iter([
1734                (
1735                    0,
1736                    build_initial_compaction_group_levels(
1737                        0,
1738                        &CompactionConfig {
1739                            max_level: 6,
1740                            ..Default::default()
1741                        },
1742                    ),
1743                ),
1744                (
1745                    1,
1746                    build_initial_compaction_group_levels(
1747                        1,
1748                        &CompactionConfig {
1749                            max_level: 6,
1750                            ..Default::default()
1751                        },
1752                    ),
1753                ),
1754            ]),
1755            ..Default::default()
1756        };
1757        let version_delta = HummockVersionDelta {
1758            id: HummockVersionId::new(1),
1759            group_deltas: HashMap::from_iter([
1760                (
1761                    2,
1762                    GroupDeltas {
1763                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1764                            group_config: Some(CompactionConfig {
1765                                max_level: 6,
1766                                ..Default::default()
1767                            }),
1768                            ..Default::default()
1769                        }))],
1770                    },
1771                ),
1772                (
1773                    0,
1774                    GroupDeltas {
1775                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1776                    },
1777                ),
1778                (
1779                    1,
1780                    GroupDeltas {
1781                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1782                            1,
1783                            0,
1784                            HashSet::new(),
1785                            vec![
1786                                SstableInfoInner {
1787                                    object_id: 1.into(),
1788                                    sst_id: 1.into(),
1789                                    ..Default::default()
1790                                }
1791                                .into(),
1792                            ],
1793                            0,
1794                            version
1795                                .levels
1796                                .get(&1)
1797                                .as_ref()
1798                                .unwrap()
1799                                .compaction_group_version_id,
1800                        ))],
1801                    },
1802                ),
1803            ]),
1804            ..Default::default()
1805        };
1806        let version_delta = version_delta;
1807
1808        version.apply_version_delta(&version_delta);
1809        let mut cg1 = build_initial_compaction_group_levels(
1810            1,
1811            &CompactionConfig {
1812                max_level: 6,
1813                ..Default::default()
1814            },
1815        );
1816        cg1.levels[0] = Level {
1817            level_idx: 1,
1818            level_type: LevelType::Nonoverlapping,
1819            table_infos: vec![
1820                SstableInfoInner {
1821                    object_id: 1.into(),
1822                    sst_id: 1.into(),
1823                    ..Default::default()
1824                }
1825                .into(),
1826            ],
1827            ..Default::default()
1828        };
1829        assert_eq!(
1830            version,
1831            HummockVersion {
1832                id: HummockVersionId::new(1),
1833                levels: HashMap::from_iter([
1834                    (
1835                        2,
1836                        build_initial_compaction_group_levels(
1837                            2,
1838                            &CompactionConfig {
1839                                max_level: 6,
1840                                ..Default::default()
1841                            },
1842                        ),
1843                    ),
1844                    (1, cg1),
1845                ]),
1846                ..Default::default()
1847            }
1848        );
1849    }
1850
1851    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1852        gen_sst_info_impl(object_id, table_ids, left, right).into()
1853    }
1854
1855    fn gen_sst_info_impl(
1856        object_id: u64,
1857        table_ids: Vec<u32>,
1858        left: Bytes,
1859        right: Bytes,
1860    ) -> SstableInfoInner {
1861        SstableInfoInner {
1862            object_id: object_id.into(),
1863            sst_id: object_id.into(),
1864            key_range: KeyRange {
1865                left,
1866                right,
1867                right_exclusive: false,
1868            },
1869            table_ids,
1870            file_size: 100,
1871            sst_size: 100,
1872            uncompressed_file_size: 100,
1873            ..Default::default()
1874        }
1875    }
1876
1877    #[test]
1878    fn test_merge_levels() {
1879        let mut left_levels = build_initial_compaction_group_levels(
1880            1,
1881            &CompactionConfig {
1882                max_level: 6,
1883                ..Default::default()
1884            },
1885        );
1886
1887        let mut right_levels = build_initial_compaction_group_levels(
1888            2,
1889            &CompactionConfig {
1890                max_level: 6,
1891                ..Default::default()
1892            },
1893        );
1894
1895        left_levels.levels[0] = Level {
1896            level_idx: 1,
1897            level_type: LevelType::Nonoverlapping,
1898            table_infos: vec![
1899                gen_sst_info(
1900                    1,
1901                    vec![3],
1902                    FullKey::for_test(
1903                        TableId::new(3),
1904                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1905                        0,
1906                    )
1907                    .encode()
1908                    .into(),
1909                    FullKey::for_test(
1910                        TableId::new(3),
1911                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1912                        0,
1913                    )
1914                    .encode()
1915                    .into(),
1916                ),
1917                gen_sst_info(
1918                    10,
1919                    vec![3, 4],
1920                    FullKey::for_test(
1921                        TableId::new(3),
1922                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1923                        0,
1924                    )
1925                    .encode()
1926                    .into(),
1927                    FullKey::for_test(
1928                        TableId::new(4),
1929                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1930                        0,
1931                    )
1932                    .encode()
1933                    .into(),
1934                ),
1935                gen_sst_info(
1936                    11,
1937                    vec![4],
1938                    FullKey::for_test(
1939                        TableId::new(4),
1940                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1941                        0,
1942                    )
1943                    .encode()
1944                    .into(),
1945                    FullKey::for_test(
1946                        TableId::new(4),
1947                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1948                        0,
1949                    )
1950                    .encode()
1951                    .into(),
1952                ),
1953            ],
1954            total_file_size: 300,
1955            ..Default::default()
1956        };
1957
1958        left_levels.l0.sub_levels.push(Level {
1959            level_idx: 0,
1960            table_infos: vec![gen_sst_info(
1961                3,
1962                vec![3],
1963                FullKey::for_test(
1964                    TableId::new(3),
1965                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1966                    0,
1967                )
1968                .encode()
1969                .into(),
1970                FullKey::for_test(
1971                    TableId::new(3),
1972                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1973                    0,
1974                )
1975                .encode()
1976                .into(),
1977            )],
1978            sub_level_id: 101,
1979            level_type: LevelType::Overlapping,
1980            total_file_size: 100,
1981            ..Default::default()
1982        });
1983
1984        left_levels.l0.sub_levels.push(Level {
1985            level_idx: 0,
1986            table_infos: vec![gen_sst_info(
1987                3,
1988                vec![3],
1989                FullKey::for_test(
1990                    TableId::new(3),
1991                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1992                    0,
1993                )
1994                .encode()
1995                .into(),
1996                FullKey::for_test(
1997                    TableId::new(3),
1998                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1999                    0,
2000                )
2001                .encode()
2002                .into(),
2003            )],
2004            sub_level_id: 103,
2005            level_type: LevelType::Overlapping,
2006            total_file_size: 100,
2007            ..Default::default()
2008        });
2009
2010        left_levels.l0.sub_levels.push(Level {
2011            level_idx: 0,
2012            table_infos: vec![gen_sst_info(
2013                3,
2014                vec![3],
2015                FullKey::for_test(
2016                    TableId::new(3),
2017                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2018                    0,
2019                )
2020                .encode()
2021                .into(),
2022                FullKey::for_test(
2023                    TableId::new(3),
2024                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2025                    0,
2026                )
2027                .encode()
2028                .into(),
2029            )],
2030            sub_level_id: 105,
2031            level_type: LevelType::Nonoverlapping,
2032            total_file_size: 100,
2033            ..Default::default()
2034        });
2035
2036        right_levels.levels[0] = Level {
2037            level_idx: 1,
2038            level_type: LevelType::Nonoverlapping,
2039            table_infos: vec![
2040                gen_sst_info(
2041                    1,
2042                    vec![5],
2043                    FullKey::for_test(
2044                        TableId::new(5),
2045                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2046                        0,
2047                    )
2048                    .encode()
2049                    .into(),
2050                    FullKey::for_test(
2051                        TableId::new(5),
2052                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2053                        0,
2054                    )
2055                    .encode()
2056                    .into(),
2057                ),
2058                gen_sst_info(
2059                    10,
2060                    vec![5, 6],
2061                    FullKey::for_test(
2062                        TableId::new(5),
2063                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2064                        0,
2065                    )
2066                    .encode()
2067                    .into(),
2068                    FullKey::for_test(
2069                        TableId::new(6),
2070                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2071                        0,
2072                    )
2073                    .encode()
2074                    .into(),
2075                ),
2076                gen_sst_info(
2077                    11,
2078                    vec![6],
2079                    FullKey::for_test(
2080                        TableId::new(6),
2081                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2082                        0,
2083                    )
2084                    .encode()
2085                    .into(),
2086                    FullKey::for_test(
2087                        TableId::new(6),
2088                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2089                        0,
2090                    )
2091                    .encode()
2092                    .into(),
2093                ),
2094            ],
2095            total_file_size: 300,
2096            ..Default::default()
2097        };
2098
2099        right_levels.l0.sub_levels.push(Level {
2100            level_idx: 0,
2101            table_infos: vec![gen_sst_info(
2102                3,
2103                vec![5],
2104                FullKey::for_test(
2105                    TableId::new(5),
2106                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2107                    0,
2108                )
2109                .encode()
2110                .into(),
2111                FullKey::for_test(
2112                    TableId::new(5),
2113                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2114                    0,
2115                )
2116                .encode()
2117                .into(),
2118            )],
2119            sub_level_id: 101,
2120            level_type: LevelType::Overlapping,
2121            total_file_size: 100,
2122            ..Default::default()
2123        });
2124
2125        right_levels.l0.sub_levels.push(Level {
2126            level_idx: 0,
2127            table_infos: vec![gen_sst_info(
2128                5,
2129                vec![5],
2130                FullKey::for_test(
2131                    TableId::new(5),
2132                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2133                    0,
2134                )
2135                .encode()
2136                .into(),
2137                FullKey::for_test(
2138                    TableId::new(5),
2139                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2140                    0,
2141                )
2142                .encode()
2143                .into(),
2144            )],
2145            sub_level_id: 102,
2146            level_type: LevelType::Overlapping,
2147            total_file_size: 100,
2148            ..Default::default()
2149        });
2150
2151        right_levels.l0.sub_levels.push(Level {
2152            level_idx: 0,
2153            table_infos: vec![gen_sst_info(
2154                3,
2155                vec![5],
2156                FullKey::for_test(
2157                    TableId::new(5),
2158                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2159                    0,
2160                )
2161                .encode()
2162                .into(),
2163                FullKey::for_test(
2164                    TableId::new(5),
2165                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2166                    0,
2167                )
2168                .encode()
2169                .into(),
2170            )],
2171            sub_level_id: 103,
2172            level_type: LevelType::Nonoverlapping,
2173            total_file_size: 100,
2174            ..Default::default()
2175        });
2176
2177        {
2178            // test empty
2179            let mut left_levels = Levels::default();
2180            let right_levels = Levels::default();
2181
2182            group_split::merge_levels(&mut left_levels, right_levels);
2183        }
2184
2185        {
2186            // test empty left
2187            let mut left_levels = build_initial_compaction_group_levels(
2188                1,
2189                &CompactionConfig {
2190                    max_level: 6,
2191                    ..Default::default()
2192                },
2193            );
2194            let right_levels = right_levels.clone();
2195
2196            group_split::merge_levels(&mut left_levels, right_levels);
2197
2198            assert!(left_levels.l0.sub_levels.len() == 3);
2199            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2200            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2201            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2202            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2203            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2204            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2205
2206            assert!(left_levels.levels[0].level_idx == 1);
2207            assert_eq!(300, left_levels.levels[0].total_file_size);
2208        }
2209
2210        {
2211            // test empty right
2212            let mut left_levels = left_levels.clone();
2213            let right_levels = build_initial_compaction_group_levels(
2214                2,
2215                &CompactionConfig {
2216                    max_level: 6,
2217                    ..Default::default()
2218                },
2219            );
2220
2221            group_split::merge_levels(&mut left_levels, right_levels);
2222
2223            assert!(left_levels.l0.sub_levels.len() == 3);
2224            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2225            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2226            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2227            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2228            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2229            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2230
2231            assert!(left_levels.levels[0].level_idx == 1);
2232            assert_eq!(300, left_levels.levels[0].total_file_size);
2233        }
2234
2235        {
2236            let mut left_levels = left_levels.clone();
2237            let right_levels = right_levels.clone();
2238
2239            group_split::merge_levels(&mut left_levels, right_levels);
2240
2241            assert!(left_levels.l0.sub_levels.len() == 6);
2242            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2243            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2244            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2245            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2246            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2247            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2248            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2249            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2250            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2251            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2252            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2253            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2254
2255            assert!(left_levels.levels[0].level_idx == 1);
2256            assert_eq!(600, left_levels.levels[0].total_file_size);
2257        }
2258    }
2259
2260    #[test]
2261    fn test_get_split_pos() {
2262        let epoch = test_epoch(1);
2263        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2264        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2265        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2266
2267        let ssts = vec![s1, s2, s3];
2268        let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2269
2270        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2271        assert_eq!(1, pos);
2272
2273        let pos = group_split::get_split_pos(&vec![], split_key);
2274        assert_eq!(0, pos);
2275    }
2276
2277    #[test]
2278    fn test_split_sst() {
2279        let epoch = test_epoch(1);
2280        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2281
2282        {
2283            let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2284            let origin_sst = sst.clone();
2285            let sst_size = origin_sst.sst_size;
2286            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2287            assert_eq!(SstSplitType::Both, split_type);
2288
2289            let mut new_sst_id = 10.into();
2290            let (origin_sst, branched_sst) = group_split::split_sst(
2291                origin_sst,
2292                &mut new_sst_id,
2293                split_key,
2294                sst_size / 2,
2295                sst_size / 2,
2296            );
2297
2298            let origin_sst = origin_sst.unwrap();
2299            let branched_sst = branched_sst.unwrap();
2300
2301            assert!(origin_sst.key_range.right_exclusive);
2302            assert!(
2303                origin_sst
2304                    .key_range
2305                    .right
2306                    .cmp(&branched_sst.key_range.left)
2307                    .is_le()
2308            );
2309            assert!(origin_sst.table_ids.is_sorted());
2310            assert!(branched_sst.table_ids.is_sorted());
2311            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2312            assert!(branched_sst.sst_size < origin_sst.file_size);
2313            assert_eq!(10, branched_sst.sst_id);
2314            assert_eq!(11, origin_sst.sst_id);
2315            assert_eq!(&3, branched_sst.table_ids.first().unwrap()); // split table_id to right
2316        }
2317
2318        {
2319            // test un-exist table_id
2320            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2321            let origin_sst = sst.clone();
2322            let sst_size = origin_sst.sst_size;
2323            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2324            assert_eq!(SstSplitType::Both, split_type);
2325
2326            let mut new_sst_id = 10.into();
2327            let (origin_sst, branched_sst) = group_split::split_sst(
2328                origin_sst,
2329                &mut new_sst_id,
2330                split_key,
2331                sst_size / 2,
2332                sst_size / 2,
2333            );
2334
2335            let origin_sst = origin_sst.unwrap();
2336            let branched_sst = branched_sst.unwrap();
2337
2338            assert!(origin_sst.key_range.right_exclusive);
2339            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2340            assert!(origin_sst.table_ids.is_sorted());
2341            assert!(branched_sst.table_ids.is_sorted());
2342            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2343            assert!(branched_sst.sst_size < origin_sst.file_size);
2344            assert_eq!(10, branched_sst.sst_id);
2345            assert_eq!(11, origin_sst.sst_id);
2346            assert_eq!(&5, branched_sst.table_ids.first().unwrap()); // split table_id to right
2347        }
2348
2349        {
2350            let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2351            let origin_sst = sst.clone();
2352            let split_type = group_split::need_to_split(&origin_sst, split_key);
2353            assert_eq!(SstSplitType::Left, split_type);
2354        }
2355
2356        {
2357            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2358            let origin_sst = sst.clone();
2359            let split_type = group_split::need_to_split(&origin_sst, split_key);
2360            assert_eq!(SstSplitType::Both, split_type);
2361
2362            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2363            let origin_sst = sst.clone();
2364            let split_type = group_split::need_to_split(&origin_sst, split_key);
2365            assert_eq!(SstSplitType::Right, split_type);
2366        }
2367
2368        {
2369            // test key_range left = right
2370            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2371            sst.key_range.right = sst.key_range.left.clone();
2372            let sst: SstableInfo = sst.into();
2373            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2374            let origin_sst = sst.clone();
2375            let sst_size = origin_sst.sst_size;
2376
2377            let mut new_sst_id = 10.into();
2378            let (origin_sst, branched_sst) = group_split::split_sst(
2379                origin_sst,
2380                &mut new_sst_id,
2381                split_key,
2382                sst_size / 2,
2383                sst_size / 2,
2384            );
2385
2386            assert!(origin_sst.is_none());
2387            assert!(branched_sst.is_some());
2388        }
2389    }
2390
2391    #[test]
2392    fn test_split_sst_info_for_level() {
2393        let mut version = HummockVersion {
2394            id: HummockVersionId(0),
2395            levels: HashMap::from_iter([(
2396                1,
2397                build_initial_compaction_group_levels(
2398                    1,
2399                    &CompactionConfig {
2400                        max_level: 6,
2401                        ..Default::default()
2402                    },
2403                ),
2404            )]),
2405            ..Default::default()
2406        };
2407
2408        let cg1 = version.levels.get_mut(&1).unwrap();
2409
2410        cg1.levels[0] = Level {
2411            level_idx: 1,
2412            level_type: LevelType::Nonoverlapping,
2413            table_infos: vec![
2414                gen_sst_info(
2415                    1,
2416                    vec![3],
2417                    FullKey::for_test(
2418                        TableId::new(3),
2419                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2420                        0,
2421                    )
2422                    .encode()
2423                    .into(),
2424                    FullKey::for_test(
2425                        TableId::new(3),
2426                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2427                        0,
2428                    )
2429                    .encode()
2430                    .into(),
2431                ),
2432                gen_sst_info(
2433                    10,
2434                    vec![3, 4],
2435                    FullKey::for_test(
2436                        TableId::new(3),
2437                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2438                        0,
2439                    )
2440                    .encode()
2441                    .into(),
2442                    FullKey::for_test(
2443                        TableId::new(4),
2444                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2445                        0,
2446                    )
2447                    .encode()
2448                    .into(),
2449                ),
2450                gen_sst_info(
2451                    11,
2452                    vec![4],
2453                    FullKey::for_test(
2454                        TableId::new(4),
2455                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2456                        0,
2457                    )
2458                    .encode()
2459                    .into(),
2460                    FullKey::for_test(
2461                        TableId::new(4),
2462                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2463                        0,
2464                    )
2465                    .encode()
2466                    .into(),
2467                ),
2468            ],
2469            total_file_size: 300,
2470            ..Default::default()
2471        };
2472
2473        cg1.l0.sub_levels.push(Level {
2474            level_idx: 0,
2475            table_infos: vec![
2476                gen_sst_info(
2477                    2,
2478                    vec![2],
2479                    FullKey::for_test(
2480                        TableId::new(0),
2481                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2482                        0,
2483                    )
2484                    .encode()
2485                    .into(),
2486                    FullKey::for_test(
2487                        TableId::new(2),
2488                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2489                        0,
2490                    )
2491                    .encode()
2492                    .into(),
2493                ),
2494                gen_sst_info(
2495                    22,
2496                    vec![2],
2497                    FullKey::for_test(
2498                        TableId::new(0),
2499                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2500                        0,
2501                    )
2502                    .encode()
2503                    .into(),
2504                    FullKey::for_test(
2505                        TableId::new(2),
2506                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2507                        0,
2508                    )
2509                    .encode()
2510                    .into(),
2511                ),
2512                gen_sst_info(
2513                    23,
2514                    vec![2],
2515                    FullKey::for_test(
2516                        TableId::new(0),
2517                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2518                        0,
2519                    )
2520                    .encode()
2521                    .into(),
2522                    FullKey::for_test(
2523                        TableId::new(2),
2524                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2525                        0,
2526                    )
2527                    .encode()
2528                    .into(),
2529                ),
2530                gen_sst_info(
2531                    24,
2532                    vec![2],
2533                    FullKey::for_test(
2534                        TableId::new(2),
2535                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2536                        0,
2537                    )
2538                    .encode()
2539                    .into(),
2540                    FullKey::for_test(
2541                        TableId::new(2),
2542                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2543                        0,
2544                    )
2545                    .encode()
2546                    .into(),
2547                ),
2548                gen_sst_info(
2549                    25,
2550                    vec![2],
2551                    FullKey::for_test(
2552                        TableId::new(0),
2553                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2554                        0,
2555                    )
2556                    .encode()
2557                    .into(),
2558                    FullKey::for_test(
2559                        TableId::new(0),
2560                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2561                        0,
2562                    )
2563                    .encode()
2564                    .into(),
2565                ),
2566            ],
2567            sub_level_id: 101,
2568            level_type: LevelType::Overlapping,
2569            total_file_size: 300,
2570            ..Default::default()
2571        });
2572
2573        {
2574            // split Overlapping level
2575            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2576
2577            let mut new_sst_id = 100.into();
2578            let x = group_split::split_sst_info_for_level_v2(
2579                &mut cg1.l0.sub_levels[0],
2580                &mut new_sst_id,
2581                split_key,
2582            );
2583            // assert_eq!(3, x.len());
2584            // assert_eq!(100, x[0].sst_id);
2585            // assert_eq!(100, x[0].sst_size);
2586            // assert_eq!(101, x[1].sst_id);
2587            // assert_eq!(100, x[1].sst_size);
2588            // assert_eq!(102, x[2].sst_id);
2589            // assert_eq!(100, x[2].sst_size);
2590
2591            let mut right_l0 = OverlappingLevel {
2592                sub_levels: vec![],
2593                total_file_size: 0,
2594                uncompressed_file_size: 0,
2595            };
2596
2597            right_l0.sub_levels.push(Level {
2598                level_idx: 0,
2599                table_infos: x,
2600                sub_level_id: 101,
2601                total_file_size: 100,
2602                level_type: LevelType::Overlapping,
2603                ..Default::default()
2604            });
2605
2606            let right_levels = Levels {
2607                levels: vec![],
2608                l0: right_l0,
2609                ..Default::default()
2610            };
2611
2612            merge_levels(cg1, right_levels);
2613        }
2614
2615        {
2616            // test split empty level
2617            let mut new_sst_id = 100.into();
2618            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2619            let x = group_split::split_sst_info_for_level_v2(
2620                &mut cg1.levels[2],
2621                &mut new_sst_id,
2622                split_key,
2623            );
2624
2625            assert!(x.is_empty());
2626        }
2627
2628        {
2629            // test split to right Nonoverlapping level
2630            let mut cg1 = cg1.clone();
2631            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2632
2633            let mut new_sst_id = 100.into();
2634            let x = group_split::split_sst_info_for_level_v2(
2635                &mut cg1.levels[0],
2636                &mut new_sst_id,
2637                split_key,
2638            );
2639
2640            assert_eq!(3, x.len());
2641            assert_eq!(1, x[0].sst_id);
2642            assert_eq!(100, x[0].sst_size);
2643            assert_eq!(10, x[1].sst_id);
2644            assert_eq!(100, x[1].sst_size);
2645            assert_eq!(11, x[2].sst_id);
2646            assert_eq!(100, x[2].sst_size);
2647
2648            assert_eq!(0, cg1.levels[0].table_infos.len());
2649        }
2650
2651        {
2652            // test split to left Nonoverlapping level
2653            let mut cg1 = cg1.clone();
2654            let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2655
2656            let mut new_sst_id = 100.into();
2657            let x = group_split::split_sst_info_for_level_v2(
2658                &mut cg1.levels[0],
2659                &mut new_sst_id,
2660                split_key,
2661            );
2662
2663            assert_eq!(0, x.len());
2664            assert_eq!(3, cg1.levels[0].table_infos.len());
2665        }
2666
2667        // {
2668        //     // test split to both Nonoverlapping level
2669        //     let mut cg1 = cg1.clone();
2670        //     let split_key = build_split_key(3, VirtualNode::MAX);
2671
2672        //     let mut new_sst_id = 100;
2673        //     let x = group_split::split_sst_info_for_level_v2(
2674        //         &mut cg1.levels[0],
2675        //         &mut new_sst_id,
2676        //         split_key,
2677        //     );
2678
2679        //     assert_eq!(2, x.len());
2680        //     assert_eq!(100, x[0].sst_id);
2681        //     assert_eq!(100 / 2, x[0].sst_size);
2682        //     assert_eq!(11, x[1].sst_id);
2683        //     assert_eq!(100, x[1].sst_size);
2684        //     assert_eq!(vec![3, 4], x[0].table_ids);
2685
2686        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2687        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2688        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2689        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2690        // }
2691
2692        {
2693            // test split to both Nonoverlapping level
2694            let mut cg1 = cg1.clone();
2695            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2696
2697            let mut new_sst_id = 100.into();
2698            let x = group_split::split_sst_info_for_level_v2(
2699                &mut cg1.levels[0],
2700                &mut new_sst_id,
2701                split_key,
2702            );
2703
2704            assert_eq!(2, x.len());
2705            assert_eq!(100, x[0].sst_id);
2706            assert_eq!(100 / 2, x[0].sst_size);
2707            assert_eq!(11, x[1].sst_id);
2708            assert_eq!(100, x[1].sst_size);
2709            assert_eq!(vec![4], x[1].table_ids);
2710
2711            assert_eq!(2, cg1.levels[0].table_infos.len());
2712            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2713            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2714            assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2715        }
2716    }
2717}