risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52    pub insert_sst_level: u32,
53    pub insert_sst_infos: Vec<SstableInfo>,
54    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61        self.levels
62            .get(&compaction_group_id)
63            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64    }
65
66    pub fn get_compaction_group_levels_mut(
67        &mut self,
68        compaction_group_id: CompactionGroupId,
69    ) -> &mut Levels {
70        self.levels
71            .get_mut(&compaction_group_id)
72            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73    }
74
75    // only scan the sst infos from levels in the specified compaction group (without table change log)
76    pub fn get_sst_ids_by_group_id(
77        &self,
78        compaction_group_id: CompactionGroupId,
79    ) -> impl Iterator<Item = HummockSstableId> + '_ {
80        self.levels
81            .iter()
82            .filter_map(move |(cg_id, level)| {
83                if *cg_id == compaction_group_id {
84                    Some(level)
85                } else {
86                    None
87                }
88            })
89            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90            .flat_map(|level| level.table_infos.iter())
91            .map(|s| s.sst_id)
92    }
93
94    pub fn level_iter<F: FnMut(&Level) -> bool>(
95        &self,
96        compaction_group_id: CompactionGroupId,
97        mut f: F,
98    ) {
99        if let Some(levels) = self.levels.get(&compaction_group_id) {
100            for sub_level in &levels.l0.sub_levels {
101                if !f(sub_level) {
102                    return;
103                }
104            }
105            for level in &levels.levels {
106                if !f(level) {
107                    return;
108                }
109            }
110        }
111    }
112
113    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114        // l0 is currently separated from all levels
115        self.levels
116            .get(&compaction_group_id)
117            .map(|group| group.levels.len() + 1)
118            .unwrap_or(0)
119    }
120
121    pub fn safe_epoch_table_watermarks(
122        &self,
123        existing_table_ids: &[TableId],
124    ) -> BTreeMap<TableId, TableWatermarks> {
125        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126    }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131    existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133    fn extract_single_table_watermark(
134        table_watermarks: &TableWatermarks,
135    ) -> Option<TableWatermarks> {
136        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137            Some(TableWatermarks {
138                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139                direction: table_watermarks.direction,
140                watermark_type: table_watermarks.watermark_type,
141            })
142        } else {
143            None
144        }
145    }
146    table_watermarks
147        .iter()
148        .filter_map(|(table_id, table_watermarks)| {
149            if !existing_table_ids.contains(table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (*table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                table_id,
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level
296                    .table_infos
297                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298                    .for_each(|sst_info| {
299                        sub_level.total_file_size -= sst_info.sst_size;
300                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301                        l0.total_file_size -= sst_info.sst_size;
302                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303                    });
304                if insert_table_infos.is_empty() {
305                    continue;
306                }
307                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308                    Ok(idx) => {
309                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310                    }
311                    Err(idx) => {
312                        insert_new_sub_level(
313                            target_l0,
314                            sub_level.sub_level_id,
315                            sub_level.level_type,
316                            insert_table_infos,
317                            Some(idx),
318                        );
319                    }
320                }
321            }
322
323            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324        }
325        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326            let insert_table_infos =
327                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328            cur_levels.levels[idx].total_file_size += insert_table_infos
329                .iter()
330                .map(|sst| sst.sst_size)
331                .sum::<u64>();
332            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333                .iter()
334                .map(|sst| sst.uncompressed_file_size)
335                .sum::<u64>();
336            cur_levels.levels[idx]
337                .table_infos
338                .extend(insert_table_infos);
339            cur_levels.levels[idx]
340                .table_infos
341                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342            assert!(can_concat(&cur_levels.levels[idx].table_infos));
343            level
344                .table_infos
345                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346                .for_each(|sst_info| {
347                    level.total_file_size -= sst_info.sst_size;
348                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349                });
350        }
351
352        assert!(
353            parent_levels
354                .l0
355                .sub_levels
356                .iter()
357                .all(|level| !level.table_infos.is_empty())
358        );
359        assert!(
360            cur_levels
361                .l0
362                .sub_levels
363                .iter()
364                .all(|level| !level.table_infos.is_empty())
365        );
366    }
367
368    pub fn build_sst_delta_infos(
369        &self,
370        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371    ) -> Vec<SstDeltaInfo> {
372        let mut infos = vec![];
373
374        // Skip trivial move delta for refiller
375        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
376        if version_delta.trivial_move {
377            return infos;
378        }
379
380        for (group_id, group_deltas) in &version_delta.group_deltas {
381            let mut info = SstDeltaInfo::default();
382
383            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386            // Build only if all deltas are intra level deltas.
387            if !group_deltas.group_deltas.iter().all(|delta| {
388                matches!(
389                    delta,
390                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391                )
392            }) {
393                continue;
394            }
395
396            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
397            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
398            // struct to reduce conventions.
399            for group_delta in &group_deltas.group_deltas {
400                match group_delta {
401                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402                        if !inserted_table_infos.is_empty() {
403                            info.insert_sst_level = 0;
404                            info.insert_sst_infos
405                                .extend(inserted_table_infos.iter().cloned());
406                        }
407                    }
408                    GroupDeltaCommon::IntraLevel(intra_level) => {
409                        if !intra_level.inserted_table_infos.is_empty() {
410                            info.insert_sst_level = intra_level.level_idx;
411                            info.insert_sst_infos
412                                .extend(intra_level.inserted_table_infos.iter().cloned());
413                        }
414                        if !intra_level.removed_table_ids.is_empty() {
415                            for id in &intra_level.removed_table_ids {
416                                if intra_level.level_idx == 0 {
417                                    removed_l0_ssts.insert(*id);
418                                } else {
419                                    removed_ssts
420                                        .entry(intra_level.level_idx)
421                                        .or_default()
422                                        .insert(*id);
423                                }
424                            }
425                        }
426                    }
427                    GroupDeltaCommon::GroupConstruct(_)
428                    | GroupDeltaCommon::GroupDestroy(_)
429                    | GroupDeltaCommon::GroupMerge(_)
430                    | GroupDeltaCommon::TruncateTables(_) => {}
431                }
432            }
433
434            let group = self.levels.get(group_id).unwrap();
435            for l0_sub_level in &group.level0().sub_levels {
436                for sst_info in &l0_sub_level.table_infos {
437                    if removed_l0_ssts.remove(&sst_info.sst_id) {
438                        info.delete_sst_object_ids.push(sst_info.object_id);
439                    }
440                }
441            }
442            for level in &group.levels {
443                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
444                    for sst_info in &level.table_infos {
445                        if removed_level_ssts.remove(&sst_info.sst_id) {
446                            info.delete_sst_object_ids.push(sst_info.object_id);
447                        }
448                    }
449                    if !removed_level_ssts.is_empty() {
450                        tracing::error!(
451                            "removed_level_ssts is not empty: {:?}",
452                            removed_level_ssts,
453                        );
454                    }
455                    debug_assert!(removed_level_ssts.is_empty());
456                }
457            }
458
459            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
460                tracing::error!(
461                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
462                    removed_l0_ssts,
463                    removed_ssts
464                );
465            }
466            debug_assert!(removed_l0_ssts.is_empty());
467            debug_assert!(removed_ssts.is_empty());
468
469            infos.push(info);
470        }
471
472        infos
473    }
474
475    pub fn apply_version_delta(
476        &mut self,
477        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
478    ) {
479        assert_eq!(self.id, version_delta.prev_id);
480
481        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
482            &version_delta.state_table_info_delta,
483            &version_delta.removed_table_ids,
484        );
485
486        #[expect(deprecated)]
487        {
488            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
489                is_commit_epoch = true;
490                tracing::trace!(
491                    "max committed epoch bumped but no table committed epoch is changed"
492                );
493            }
494        }
495
496        // apply to `levels`, which is different compaction groups
497        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
498            let mut is_applied_l0_compact = false;
499            for group_delta in &group_deltas.group_deltas {
500                match group_delta {
501                    GroupDeltaCommon::GroupConstruct(group_construct) => {
502                        let mut new_levels = build_initial_compaction_group_levels(
503                            *compaction_group_id,
504                            group_construct.get_group_config().unwrap(),
505                        );
506                        let parent_group_id = group_construct.parent_group_id;
507                        new_levels.parent_group_id = parent_group_id;
508                        #[expect(deprecated)]
509                        // for backward-compatibility of previous hummock version delta
510                        new_levels
511                            .member_table_ids
512                            .clone_from(&group_construct.table_ids);
513                        self.levels.insert(*compaction_group_id, new_levels);
514                        let member_table_ids = if group_construct.version()
515                            >= CompatibilityVersion::NoMemberTableIds
516                        {
517                            self.state_table_info
518                                .compaction_group_member_table_ids(*compaction_group_id)
519                                .iter()
520                                .copied()
521                                .collect()
522                        } else {
523                            #[expect(deprecated)]
524                            // for backward-compatibility of previous hummock version delta
525                            BTreeSet::from_iter(
526                                group_construct.table_ids.iter().copied().map(Into::into),
527                            )
528                        };
529
530                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
531                            let split_key = if group_construct.split_key.is_some() {
532                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
533                            } else {
534                                None
535                            };
536                            self.init_with_parent_group_v2(
537                                parent_group_id,
538                                *compaction_group_id,
539                                group_construct.get_new_sst_start_id().into(),
540                                split_key.clone(),
541                            );
542                        } else {
543                            // for backward-compatibility of previous hummock version delta
544                            self.init_with_parent_group(
545                                parent_group_id,
546                                *compaction_group_id,
547                                member_table_ids,
548                                group_construct.get_new_sst_start_id().into(),
549                            );
550                        }
551                    }
552                    GroupDeltaCommon::GroupMerge(group_merge) => {
553                        tracing::info!(
554                            "group_merge left {:?} right {:?}",
555                            group_merge.left_group_id,
556                            group_merge.right_group_id
557                        );
558                        self.merge_compaction_group(
559                            group_merge.left_group_id,
560                            group_merge.right_group_id,
561                        )
562                    }
563                    GroupDeltaCommon::IntraLevel(level_delta) => {
564                        let levels =
565                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
566                                panic!("compaction group {} does not exist", compaction_group_id)
567                            });
568                        if is_commit_epoch {
569                            assert!(
570                                level_delta.removed_table_ids.is_empty(),
571                                "no sst should be deleted when committing an epoch"
572                            );
573
574                            let IntraLevelDelta {
575                                level_idx,
576                                l0_sub_level_id,
577                                inserted_table_infos,
578                                ..
579                            } = level_delta;
580                            {
581                                assert_eq!(
582                                    *level_idx, 0,
583                                    "we should only add to L0 when we commit an epoch."
584                                );
585                                if !inserted_table_infos.is_empty() {
586                                    insert_new_sub_level(
587                                        &mut levels.l0,
588                                        *l0_sub_level_id,
589                                        PbLevelType::Overlapping,
590                                        inserted_table_infos.clone(),
591                                        None,
592                                    );
593                                }
594                            }
595                        } else {
596                            // The delta is caused by compaction.
597                            levels.apply_compact_ssts(
598                                level_delta,
599                                self.state_table_info
600                                    .compaction_group_member_table_ids(*compaction_group_id),
601                            );
602                            if level_delta.level_idx == 0 {
603                                is_applied_l0_compact = true;
604                            }
605                        }
606                    }
607                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
608                        let levels =
609                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
610                                panic!("compaction group {} does not exist", compaction_group_id)
611                            });
612                        assert!(is_commit_epoch);
613
614                        if !inserted_table_infos.is_empty() {
615                            let next_l0_sub_level_id = levels
616                                .l0
617                                .sub_levels
618                                .last()
619                                .map(|level| level.sub_level_id + 1)
620                                .unwrap_or(1);
621
622                            insert_new_sub_level(
623                                &mut levels.l0,
624                                next_l0_sub_level_id,
625                                PbLevelType::Overlapping,
626                                inserted_table_infos.clone(),
627                                None,
628                            );
629                        }
630                    }
631                    GroupDeltaCommon::GroupDestroy(_) => {
632                        self.levels.remove(compaction_group_id);
633                    }
634
635                    GroupDeltaCommon::TruncateTables(table_ids) => {
636                        // iterate all levels and truncate the specified tables with the given table ids
637                        let levels =
638                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
639                                panic!("compaction group {} does not exist", compaction_group_id)
640                            });
641
642                        for sub_level in &mut levels.l0.sub_levels {
643                            sub_level.table_infos.iter_mut().for_each(|sstable_info| {
644                                let mut inner = sstable_info.get_inner();
645                                inner.table_ids.retain(|id| !table_ids.contains(id));
646                                sstable_info.set_inner(inner);
647                            });
648                        }
649
650                        for level in &mut levels.levels {
651                            level.table_infos.iter_mut().for_each(|sstable_info| {
652                                let mut inner = sstable_info.get_inner();
653                                inner.table_ids.retain(|id| !table_ids.contains(id));
654                                sstable_info.set_inner(inner);
655                            });
656                        }
657
658                        levels.compaction_group_version_id += 1;
659                    }
660                }
661            }
662
663            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
664            {
665                levels.post_apply_l0_compact();
666            }
667        }
668        self.id = version_delta.id;
669        #[expect(deprecated)]
670        {
671            self.max_committed_epoch = version_delta.max_committed_epoch;
672        }
673
674        // apply to table watermark
675
676        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
677        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
678            HashMap::new();
679
680        // apply to table watermark
681        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
682            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
683                if version_delta.removed_table_ids.contains(table_id) {
684                    modified_table_watermarks.insert(*table_id, None);
685                } else {
686                    let mut current_table_watermarks = (**current_table_watermarks).clone();
687                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
688                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
689                }
690            } else {
691                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
692            }
693        }
694        for (table_id, table_watermarks) in &self.table_watermarks {
695            let safe_epoch = if let Some(state_table_info) =
696                self.state_table_info.info().get(table_id)
697                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
698                && state_table_info.committed_epoch > *oldest_epoch
699            {
700                // safe epoch has progressed, need further clear.
701                state_table_info.committed_epoch
702            } else {
703                // safe epoch not progressed or the table has been removed. No need to truncate
704                continue;
705            };
706            let table_watermarks = modified_table_watermarks
707                .entry(*table_id)
708                .or_insert_with(|| Some((**table_watermarks).clone()));
709            if let Some(table_watermarks) = table_watermarks {
710                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
711            }
712        }
713        // apply the staging table watermark to hummock version
714        for (table_id, table_watermarks) in modified_table_watermarks {
715            if let Some(table_watermarks) = table_watermarks {
716                self.table_watermarks
717                    .insert(table_id, Arc::new(table_watermarks));
718            } else {
719                self.table_watermarks.remove(&table_id);
720            }
721        }
722
723        // apply to table change log
724        Self::apply_change_log_delta(
725            &mut self.table_change_log,
726            &version_delta.change_log_delta,
727            &version_delta.removed_table_ids,
728            &version_delta.state_table_info_delta,
729            &changed_table_info,
730        );
731
732        // apply to vector index
733        apply_vector_index_delta(
734            &mut self.vector_indexes,
735            &version_delta.vector_index_delta,
736            &version_delta.removed_table_ids,
737        );
738    }
739
740    pub fn apply_change_log_delta<T: Clone>(
741        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
742        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
743        removed_table_ids: &HashSet<TableId>,
744        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
745        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
746    ) {
747        for (table_id, change_log_delta) in change_log_delta {
748            let new_change_log = &change_log_delta.new_log;
749            match table_change_log.entry(*table_id) {
750                Entry::Occupied(entry) => {
751                    let change_log = entry.into_mut();
752                    change_log.add_change_log(new_change_log.clone());
753                }
754                Entry::Vacant(entry) => {
755                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
756                }
757            };
758        }
759
760        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
761        // the change log for the table, and then we will remove the table change log.
762        // The table change log will also be removed when the table id is removed.
763        table_change_log.retain(|table_id, _| {
764            if removed_table_ids.contains(table_id) {
765                return false;
766            }
767            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
768                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
769                // the table exists previously, and its committed epoch has progressed.
770            } else {
771                // otherwise, the table change log should be kept anyway
772                return true;
773            }
774            let contains = change_log_delta.contains_key(table_id);
775            if !contains {
776                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
777                    LazyLock::new(|| LogSuppressor::per_second(1));
778                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
779                    warn!(
780                        suppressed_count,
781                        %table_id,
782                        "table change log dropped due to no further change log at newly committed epoch"
783                    );
784                }
785            }
786            contains
787        });
788
789        // truncate the remaining table change log
790        for (table_id, change_log_delta) in change_log_delta {
791            if let Some(change_log) = table_change_log.get_mut(table_id) {
792                change_log.truncate(change_log_delta.truncate_epoch);
793            }
794        }
795    }
796
797    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
798        let mut ret: BTreeMap<_, _> = BTreeMap::new();
799        for (compaction_group_id, group) in &self.levels {
800            let mut levels = vec![];
801            levels.extend(group.l0.sub_levels.iter());
802            levels.extend(group.levels.iter());
803            for level in levels {
804                for table_info in &level.table_infos {
805                    if table_info.sst_id.inner() == table_info.object_id.inner() {
806                        continue;
807                    }
808                    let object_id = table_info.object_id;
809                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
810                    entry
811                        .entry(*compaction_group_id)
812                        .or_default()
813                        .push(table_info.sst_id)
814                }
815            }
816        }
817        ret
818    }
819
820    pub fn merge_compaction_group(
821        &mut self,
822        left_group_id: CompactionGroupId,
823        right_group_id: CompactionGroupId,
824    ) {
825        // Double check
826        let left_group_id_table_ids = self
827            .state_table_info
828            .compaction_group_member_table_ids(left_group_id)
829            .iter();
830        let right_group_id_table_ids = self
831            .state_table_info
832            .compaction_group_member_table_ids(right_group_id)
833            .iter();
834
835        assert!(
836            left_group_id_table_ids
837                .chain(right_group_id_table_ids)
838                .is_sorted()
839        );
840
841        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
842        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
843            panic!(
844                "compaction group should exist right {} all {:?}",
845                right_group_id, total_cg
846            )
847        });
848
849        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
850            panic!(
851                "compaction group should exist left {} all {:?}",
852                left_group_id, total_cg
853            )
854        });
855
856        group_split::merge_levels(left_levels, right_levels);
857    }
858
859    pub fn init_with_parent_group_v2(
860        &mut self,
861        parent_group_id: CompactionGroupId,
862        group_id: CompactionGroupId,
863        new_sst_start_id: HummockSstableId,
864        split_key: Option<Bytes>,
865    ) {
866        let mut new_sst_id = new_sst_start_id;
867        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
868            if new_sst_start_id != 0 {
869                if cfg!(debug_assertions) {
870                    panic!(
871                        "non-zero sst start id {} for NewCompactionGroup",
872                        new_sst_start_id
873                    );
874                } else {
875                    warn!(
876                        %new_sst_start_id,
877                        "non-zero sst start id for NewCompactionGroup"
878                    );
879                }
880            }
881            return;
882        } else if !self.levels.contains_key(&parent_group_id) {
883            unreachable!(
884                "non-existing parent group id {} to init from (V2)",
885                parent_group_id
886            );
887        }
888
889        let [parent_levels, cur_levels] = self
890            .levels
891            .get_disjoint_mut([&parent_group_id, &group_id])
892            .map(|res| res.unwrap());
893        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
894        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
895        parent_levels.compaction_group_version_id += 1;
896        cur_levels.compaction_group_version_id += 1;
897
898        let l0 = &mut parent_levels.l0;
899        {
900            for sub_level in &mut l0.sub_levels {
901                let target_l0 = &mut cur_levels.l0;
902                // Remove SST from sub level may result in empty sub level. It will be purged
903                // whenever another compaction task is finished.
904                let insert_table_infos = if let Some(split_key) = &split_key {
905                    group_split::split_sst_info_for_level_v2(
906                        sub_level,
907                        &mut new_sst_id,
908                        split_key.clone(),
909                    )
910                } else {
911                    vec![]
912                };
913
914                if insert_table_infos.is_empty() {
915                    continue;
916                }
917
918                sub_level
919                    .table_infos
920                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
921                    .for_each(|sst_info| {
922                        sub_level.total_file_size -= sst_info.sst_size;
923                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
924                        l0.total_file_size -= sst_info.sst_size;
925                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
926                    });
927                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
928                    Ok(idx) => {
929                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
930                    }
931                    Err(idx) => {
932                        insert_new_sub_level(
933                            target_l0,
934                            sub_level.sub_level_id,
935                            sub_level.level_type,
936                            insert_table_infos,
937                            Some(idx),
938                        );
939                    }
940                }
941            }
942
943            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
944        }
945
946        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
947            let insert_table_infos = if let Some(split_key) = &split_key {
948                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
949            } else {
950                vec![]
951            };
952
953            if insert_table_infos.is_empty() {
954                continue;
955            }
956
957            cur_levels.levels[idx].total_file_size += insert_table_infos
958                .iter()
959                .map(|sst| sst.sst_size)
960                .sum::<u64>();
961            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
962                .iter()
963                .map(|sst| sst.uncompressed_file_size)
964                .sum::<u64>();
965            cur_levels.levels[idx]
966                .table_infos
967                .extend(insert_table_infos);
968            cur_levels.levels[idx]
969                .table_infos
970                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
971            assert!(can_concat(&cur_levels.levels[idx].table_infos));
972            level
973                .table_infos
974                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
975                .for_each(|sst_info| {
976                    level.total_file_size -= sst_info.sst_size;
977                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
978                });
979        }
980
981        assert!(
982            parent_levels
983                .l0
984                .sub_levels
985                .iter()
986                .all(|level| !level.table_infos.is_empty())
987        );
988        assert!(
989            cur_levels
990                .l0
991                .sub_levels
992                .iter()
993                .all(|level| !level.table_infos.is_empty())
994        );
995    }
996}
997
998impl<T> HummockVersionCommon<T>
999where
1000    T: SstableIdReader + ObjectIdReader,
1001{
1002    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
1003        self.get_sst_infos(exclude_change_log)
1004            .map(|s| s.object_id())
1005            .collect()
1006    }
1007
1008    pub fn get_object_ids(
1009        &self,
1010        exclude_change_log: bool,
1011    ) -> impl Iterator<Item = HummockObjectId> + '_ {
1012        // DO NOT REMOVE THIS LINE
1013        // This is to ensure that when adding new variant to `HummockObjectId`,
1014        // the compiler will warn us if we forget to handle it here.
1015        match HummockObjectId::Sstable(0.into()) {
1016            HummockObjectId::Sstable(_) => {}
1017            HummockObjectId::VectorFile(_) => {}
1018            HummockObjectId::HnswGraphFile(_) => {}
1019        };
1020        self.get_sst_infos(exclude_change_log)
1021            .map(|s| HummockObjectId::Sstable(s.object_id()))
1022            .chain(
1023                self.vector_indexes
1024                    .values()
1025                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1026            )
1027    }
1028
1029    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1030        self.get_sst_infos(exclude_change_log)
1031            .map(|s| s.sst_id())
1032            .collect()
1033    }
1034
1035    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1036        let may_table_change_log = if exclude_change_log {
1037            None
1038        } else {
1039            Some(self.table_change_log.values())
1040        };
1041        self.get_combined_levels()
1042            .flat_map(|level| level.table_infos.iter())
1043            .chain(
1044                may_table_change_log
1045                    .map(|v| {
1046                        v.flat_map(|table_change_log| {
1047                            table_change_log.iter().flat_map(|epoch_change_log| {
1048                                epoch_change_log
1049                                    .old_value
1050                                    .iter()
1051                                    .chain(epoch_change_log.new_value.iter())
1052                            })
1053                        })
1054                    })
1055                    .into_iter()
1056                    .flatten(),
1057            )
1058    }
1059}
1060
1061impl Levels {
1062    pub(crate) fn apply_compact_ssts(
1063        &mut self,
1064        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1065        member_table_ids: &BTreeSet<TableId>,
1066    ) {
1067        let IntraLevelDeltaCommon {
1068            level_idx,
1069            l0_sub_level_id,
1070            inserted_table_infos: insert_table_infos,
1071            vnode_partition_count,
1072            removed_table_ids: delete_sst_ids_set,
1073            compaction_group_version_id,
1074        } = level_delta;
1075        let new_vnode_partition_count = *vnode_partition_count;
1076
1077        if is_compaction_task_expired(
1078            self.compaction_group_version_id,
1079            *compaction_group_version_id,
1080        ) {
1081            warn!(
1082                current_compaction_group_version_id = self.compaction_group_version_id,
1083                delta_compaction_group_version_id = compaction_group_version_id,
1084                level_idx,
1085                l0_sub_level_id,
1086                insert_table_infos = ?insert_table_infos
1087                    .iter()
1088                    .map(|sst| (sst.sst_id, sst.object_id))
1089                    .collect_vec(),
1090                ?delete_sst_ids_set,
1091                "This VersionDelta may be committed by an expired compact task. Please check it."
1092            );
1093            return;
1094        }
1095        if !delete_sst_ids_set.is_empty() {
1096            if *level_idx == 0 {
1097                for level in &mut self.l0.sub_levels {
1098                    level_delete_ssts(level, delete_sst_ids_set);
1099                }
1100            } else {
1101                let idx = *level_idx as usize - 1;
1102                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1103            }
1104        }
1105
1106        if !insert_table_infos.is_empty() {
1107            let insert_sst_level_id = *level_idx;
1108            let insert_sub_level_id = *l0_sub_level_id;
1109            if insert_sst_level_id == 0 {
1110                let l0 = &mut self.l0;
1111                let index = l0
1112                    .sub_levels
1113                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1114                assert!(
1115                    index < l0.sub_levels.len()
1116                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1117                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1118                    insert_sub_level_id,
1119                    delete_sst_ids_set,
1120                    l0.sub_levels
1121                        .iter()
1122                        .map(|level| level.sub_level_id)
1123                        .collect_vec()
1124                );
1125                if l0.sub_levels[index].table_infos.is_empty()
1126                    && member_table_ids.len() == 1
1127                    && insert_table_infos.iter().all(|sst| {
1128                        sst.table_ids.len() == 1
1129                            && sst.table_ids[0]
1130                                == *member_table_ids.iter().next().expect("non-empty")
1131                    })
1132                {
1133                    // Only change vnode_partition_count for group which has only one state-table.
1134                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1135                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1136                }
1137                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1138            } else {
1139                let idx = insert_sst_level_id as usize - 1;
1140                if self.levels[idx].table_infos.is_empty()
1141                    && insert_table_infos
1142                        .iter()
1143                        .all(|sst| sst.table_ids.len() == 1)
1144                {
1145                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1146                } else if self.levels[idx].vnode_partition_count != 0
1147                    && new_vnode_partition_count == 0
1148                    && member_table_ids.len() > 1
1149                {
1150                    self.levels[idx].vnode_partition_count = 0;
1151                }
1152                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1153            }
1154        }
1155    }
1156
1157    pub(crate) fn post_apply_l0_compact(&mut self) {
1158        {
1159            self.l0
1160                .sub_levels
1161                .retain(|level| !level.table_infos.is_empty());
1162            self.l0.total_file_size = self
1163                .l0
1164                .sub_levels
1165                .iter()
1166                .map(|level| level.total_file_size)
1167                .sum::<u64>();
1168            self.l0.uncompressed_file_size = self
1169                .l0
1170                .sub_levels
1171                .iter()
1172                .map(|level| level.uncompressed_file_size)
1173                .sum::<u64>();
1174        }
1175    }
1176}
1177
1178impl<T, L> HummockVersionCommon<T, L> {
1179    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1180        self.levels
1181            .values()
1182            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1183    }
1184}
1185
1186pub fn build_initial_compaction_group_levels(
1187    group_id: CompactionGroupId,
1188    compaction_config: &CompactionConfig,
1189) -> Levels {
1190    let mut levels = vec![];
1191    for l in 0..compaction_config.get_max_level() {
1192        levels.push(Level {
1193            level_idx: (l + 1) as u32,
1194            level_type: PbLevelType::Nonoverlapping,
1195            table_infos: vec![],
1196            total_file_size: 0,
1197            sub_level_id: 0,
1198            uncompressed_file_size: 0,
1199            vnode_partition_count: 0,
1200        });
1201    }
1202    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1203    Levels {
1204        levels,
1205        l0: OverlappingLevel {
1206            sub_levels: vec![],
1207            total_file_size: 0,
1208            uncompressed_file_size: 0,
1209        },
1210        group_id,
1211        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1212        member_table_ids: vec![],
1213        compaction_group_version_id: 0,
1214    }
1215}
1216
1217fn split_sst_info_for_level(
1218    member_table_ids: &BTreeSet<TableId>,
1219    level: &mut Level,
1220    new_sst_id: &mut HummockSstableId,
1221) -> Vec<SstableInfo> {
1222    // Remove SST from sub level may result in empty sub level. It will be purged
1223    // whenever another compaction task is finished.
1224    let mut insert_table_infos = vec![];
1225    for sst_info in &mut level.table_infos {
1226        let removed_table_ids = sst_info
1227            .table_ids
1228            .iter()
1229            .filter(|table_id| member_table_ids.contains(table_id))
1230            .cloned()
1231            .collect_vec();
1232        let sst_size = sst_info.sst_size;
1233        if sst_size / 2 == 0 {
1234            tracing::warn!(
1235                id = %sst_info.sst_id,
1236                object_id = %sst_info.object_id,
1237                sst_size = sst_info.sst_size,
1238                file_size = sst_info.file_size,
1239                "Sstable sst_size is under expected",
1240            );
1241        };
1242        if !removed_table_ids.is_empty() {
1243            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1244                sst_info,
1245                new_sst_id,
1246                sst_size / 2,
1247                sst_size / 2,
1248                member_table_ids.iter().cloned().collect_vec(),
1249            );
1250            *sst_info = modified_sst;
1251            insert_table_infos.push(branch_sst);
1252        }
1253    }
1254    insert_table_infos
1255}
1256
1257/// Gets all compaction group ids.
1258pub fn get_compaction_group_ids(
1259    version: &HummockVersion,
1260) -> impl Iterator<Item = CompactionGroupId> + '_ {
1261    version.levels.keys().cloned()
1262}
1263
1264pub fn get_table_compaction_group_id_mapping(
1265    version: &HummockVersion,
1266) -> HashMap<StateTableId, CompactionGroupId> {
1267    version
1268        .state_table_info
1269        .info()
1270        .iter()
1271        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1272        .collect()
1273}
1274
1275/// Gets all SSTs in `group_id`
1276pub fn get_compaction_group_ssts(
1277    version: &HummockVersion,
1278    group_id: CompactionGroupId,
1279) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1280    let group_levels = version.get_compaction_group_levels(group_id);
1281    group_levels
1282        .l0
1283        .sub_levels
1284        .iter()
1285        .rev()
1286        .chain(group_levels.levels.iter())
1287        .flat_map(|level| {
1288            level
1289                .table_infos
1290                .iter()
1291                .map(|table_info| (table_info.object_id, table_info.sst_id))
1292        })
1293}
1294
1295pub fn new_sub_level(
1296    sub_level_id: u64,
1297    level_type: PbLevelType,
1298    table_infos: Vec<SstableInfo>,
1299) -> Level {
1300    if level_type == PbLevelType::Nonoverlapping {
1301        debug_assert!(
1302            can_concat(&table_infos),
1303            "sst of non-overlapping level is not concat-able: {:?}",
1304            table_infos
1305        );
1306    }
1307    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1308    let uncompressed_file_size = table_infos
1309        .iter()
1310        .map(|table| table.uncompressed_file_size)
1311        .sum();
1312    Level {
1313        level_idx: 0,
1314        level_type,
1315        table_infos,
1316        total_file_size,
1317        sub_level_id,
1318        uncompressed_file_size,
1319        vnode_partition_count: 0,
1320    }
1321}
1322
1323pub fn add_ssts_to_sub_level(
1324    l0: &mut OverlappingLevel,
1325    sub_level_idx: usize,
1326    insert_table_infos: Vec<SstableInfo>,
1327) {
1328    insert_table_infos.iter().for_each(|sst| {
1329        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1330        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1331        l0.total_file_size += sst.sst_size;
1332        l0.uncompressed_file_size += sst.uncompressed_file_size;
1333    });
1334    l0.sub_levels[sub_level_idx]
1335        .table_infos
1336        .extend(insert_table_infos);
1337    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1338        l0.sub_levels[sub_level_idx]
1339            .table_infos
1340            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1341        assert!(
1342            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1343            "sstable ids: {:?}",
1344            l0.sub_levels[sub_level_idx]
1345                .table_infos
1346                .iter()
1347                .map(|sst| sst.sst_id)
1348                .collect_vec()
1349        );
1350    }
1351}
1352
1353/// `None` value of `sub_level_insert_hint` means append.
1354pub fn insert_new_sub_level(
1355    l0: &mut OverlappingLevel,
1356    insert_sub_level_id: u64,
1357    level_type: PbLevelType,
1358    insert_table_infos: Vec<SstableInfo>,
1359    sub_level_insert_hint: Option<usize>,
1360) {
1361    if insert_sub_level_id == u64::MAX {
1362        return;
1363    }
1364    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1365        insert_pos
1366    } else {
1367        if let Some(newest_level) = l0.sub_levels.last() {
1368            assert!(
1369                newest_level.sub_level_id < insert_sub_level_id,
1370                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1371                newest_level.sub_level_id,
1372                insert_sub_level_id,
1373                l0,
1374            );
1375        }
1376        l0.sub_levels.len()
1377    };
1378    #[cfg(debug_assertions)]
1379    {
1380        if insert_pos > 0
1381            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1382        {
1383            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1384        }
1385        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1386            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1387        }
1388    }
1389    // All files will be committed in one new Overlapping sub-level and become
1390    // Nonoverlapping  after at least one compaction.
1391    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1392    l0.total_file_size += level.total_file_size;
1393    l0.uncompressed_file_size += level.uncompressed_file_size;
1394    l0.sub_levels.insert(insert_pos, level);
1395}
1396
1397/// Delete sstables if the table id is in the id set.
1398///
1399/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1400fn level_delete_ssts(
1401    operand: &mut Level,
1402    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1403) -> bool {
1404    let original_len = operand.table_infos.len();
1405    operand
1406        .table_infos
1407        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1408    operand.total_file_size = operand
1409        .table_infos
1410        .iter()
1411        .map(|table| table.sst_size)
1412        .sum::<u64>();
1413    operand.uncompressed_file_size = operand
1414        .table_infos
1415        .iter()
1416        .map(|table| table.uncompressed_file_size)
1417        .sum::<u64>();
1418    original_len != operand.table_infos.len()
1419}
1420
1421fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1422    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1423        format!(
1424            "sstable ids: {:?}",
1425            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1426        )
1427    }
1428    operand.total_file_size += insert_table_infos
1429        .iter()
1430        .map(|sst| sst.sst_size)
1431        .sum::<u64>();
1432    operand.uncompressed_file_size += insert_table_infos
1433        .iter()
1434        .map(|sst| sst.uncompressed_file_size)
1435        .sum::<u64>();
1436    if operand.level_type == PbLevelType::Overlapping {
1437        operand.level_type = PbLevelType::Nonoverlapping;
1438        operand
1439            .table_infos
1440            .extend(insert_table_infos.iter().cloned());
1441        operand
1442            .table_infos
1443            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1444        assert!(
1445            can_concat(&operand.table_infos),
1446            "{}",
1447            display_sstable_infos(&operand.table_infos)
1448        );
1449    } else if !insert_table_infos.is_empty() {
1450        let sorted_insert: Vec<_> = insert_table_infos
1451            .iter()
1452            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1453            .cloned()
1454            .collect();
1455        let first = &sorted_insert[0];
1456        let last = &sorted_insert[sorted_insert.len() - 1];
1457        let pos = operand
1458            .table_infos
1459            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1460        if pos >= operand.table_infos.len()
1461            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1462        {
1463            operand.table_infos.splice(pos..pos, sorted_insert);
1464            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1465            let validate_range = operand
1466                .table_infos
1467                .iter()
1468                .skip(pos.saturating_sub(1))
1469                .take(insert_table_infos.len() + 2)
1470                .collect_vec();
1471            assert!(
1472                can_concat(&validate_range),
1473                "{}",
1474                display_sstable_infos(&validate_range),
1475            );
1476        } else {
1477            // If this branch is reached, it indicates some unexpected behavior in compaction.
1478            // Here we issue a warning and fall back to insert one by one.
1479            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1480            for i in insert_table_infos {
1481                let pos = operand
1482                    .table_infos
1483                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1484                operand.table_infos.insert(pos, i.clone());
1485            }
1486            assert!(
1487                can_concat(&operand.table_infos),
1488                "{}",
1489                display_sstable_infos(&operand.table_infos)
1490            );
1491        }
1492    }
1493}
1494
1495pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1496    // DO NOT REMOVE THIS LINE
1497    // This is to ensure that when adding new variant to `HummockObjectId`,
1498    // the compiler will warn us if we forget to handle it here.
1499    match HummockObjectId::Sstable(0.into()) {
1500        HummockObjectId::Sstable(_) => {}
1501        HummockObjectId::VectorFile(_) => {}
1502        HummockObjectId::HnswGraphFile(_) => {}
1503    };
1504    version
1505        .levels
1506        .values()
1507        .flat_map(|cg| {
1508            cg.level0()
1509                .sub_levels
1510                .iter()
1511                .chain(cg.levels.iter())
1512                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1513        })
1514        .chain(version.table_change_log.values().flat_map(|c| {
1515            c.iter().flat_map(|l| {
1516                l.old_value
1517                    .iter()
1518                    .chain(l.new_value.iter())
1519                    .map(|t| (t.object_id, t.file_size))
1520            })
1521        }))
1522        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1523        .chain(
1524            version
1525                .vector_indexes
1526                .values()
1527                .flat_map(|index| index.get_objects()),
1528        )
1529        .collect()
1530}
1531
1532/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1533/// Currently this method is only used by risectl validate-version.
1534pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1535    let mut res = Vec::new();
1536    // Ensure each table maps to only one compaction group
1537    for (group_id, levels) in &version.levels {
1538        // Ensure compaction group id matches
1539        if levels.group_id != *group_id {
1540            res.push(format!(
1541                "GROUP {}: inconsistent group id {} in Levels",
1542                group_id, levels.group_id
1543            ));
1544        }
1545
1546        let validate_level = |group: CompactionGroupId,
1547                              expected_level_idx: u32,
1548                              level: &Level,
1549                              res: &mut Vec<String>| {
1550            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1551            if level.level_idx == 0 {
1552                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1553                // Ensure sub-level is not empty
1554                if level.table_infos.is_empty() {
1555                    res.push(format!("{}: empty level", level_identifier));
1556                }
1557            } else if level.level_type != PbLevelType::Nonoverlapping {
1558                // Ensure non-L0 level is non-overlapping level
1559                res.push(format!(
1560                    "{}: level type {:?} is not non-overlapping",
1561                    level_identifier, level.level_type
1562                ));
1563            }
1564
1565            // Ensure level idx matches
1566            if level.level_idx != expected_level_idx {
1567                res.push(format!(
1568                    "{}: mismatched level idx {}",
1569                    level_identifier, expected_level_idx
1570                ));
1571            }
1572
1573            let mut prev_table_info: Option<&SstableInfo> = None;
1574            for table_info in &level.table_infos {
1575                // Ensure table_ids are sorted and unique
1576                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1577                    res.push(format!(
1578                        "{} SST {}: table_ids not sorted",
1579                        level_identifier, table_info.object_id
1580                    ));
1581                }
1582
1583                // Ensure SSTs in non-overlapping level have non-overlapping key range
1584                if level.level_type == PbLevelType::Nonoverlapping {
1585                    if let Some(prev) = prev_table_info.take()
1586                        && prev
1587                            .key_range
1588                            .compare_right_with(&table_info.key_range.left)
1589                            != Ordering::Less
1590                    {
1591                        res.push(format!(
1592                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1593                            level_identifier, table_info.object_id, prev, table_info
1594                        ));
1595                    }
1596                    let _ = prev_table_info.insert(table_info);
1597                }
1598            }
1599        };
1600
1601        let l0 = &levels.l0;
1602        let mut prev_sub_level_id = u64::MAX;
1603        for sub_level in &l0.sub_levels {
1604            // Ensure sub_level_id is sorted and unique
1605            if sub_level.sub_level_id >= prev_sub_level_id {
1606                res.push(format!(
1607                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1608                    group_id, sub_level.level_idx, prev_sub_level_id
1609                ));
1610            }
1611            prev_sub_level_id = sub_level.sub_level_id;
1612
1613            validate_level(*group_id, 0, sub_level, &mut res);
1614        }
1615
1616        for idx in 1..=levels.levels.len() {
1617            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1618        }
1619    }
1620    res
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625    use std::collections::{HashMap, HashSet};
1626
1627    use bytes::Bytes;
1628    use risingwave_common::catalog::TableId;
1629    use risingwave_common::hash::VirtualNode;
1630    use risingwave_common::util::epoch::test_epoch;
1631    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1632
1633    use super::group_split;
1634    use crate::HummockVersionId;
1635    use crate::compaction_group::group_split::*;
1636    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1637    use crate::key::{FullKey, gen_key_from_str};
1638    use crate::key_range::KeyRange;
1639    use crate::level::{Level, Levels, OverlappingLevel};
1640    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1641    use crate::version::{
1642        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1643    };
1644
1645    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1646        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1647    }
1648
1649    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1650        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1651        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1652        let full_key_l = FullKey::for_test(
1653            TableId::new(*table_ids.first().unwrap()),
1654            table_key_l,
1655            epoch,
1656        )
1657        .encode();
1658        let full_key_r =
1659            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1660                .encode();
1661
1662        SstableInfoInner {
1663            sst_id: sst_id.into(),
1664            key_range: KeyRange {
1665                left: full_key_l.into(),
1666                right: full_key_r.into(),
1667                right_exclusive: false,
1668            },
1669            table_ids: table_ids.into_iter().map(Into::into).collect(),
1670            object_id: sst_id.into(),
1671            min_epoch: 20,
1672            max_epoch: 20,
1673            file_size: 100,
1674            sst_size: 100,
1675            ..Default::default()
1676        }
1677    }
1678
1679    #[test]
1680    fn test_get_sst_object_ids() {
1681        let mut version = HummockVersion {
1682            id: HummockVersionId::new(0),
1683            levels: HashMap::from_iter([(
1684                0,
1685                Levels {
1686                    levels: vec![],
1687                    l0: OverlappingLevel {
1688                        sub_levels: vec![],
1689                        total_file_size: 0,
1690                        uncompressed_file_size: 0,
1691                    },
1692                    ..Default::default()
1693                },
1694            )]),
1695            ..Default::default()
1696        };
1697        assert_eq!(version.get_object_ids(false).count(), 0);
1698
1699        // Add to sub level
1700        version
1701            .levels
1702            .get_mut(&0)
1703            .unwrap()
1704            .l0
1705            .sub_levels
1706            .push(Level {
1707                table_infos: vec![
1708                    SstableInfoInner {
1709                        object_id: 11.into(),
1710                        sst_id: 11.into(),
1711                        ..Default::default()
1712                    }
1713                    .into(),
1714                ],
1715                ..Default::default()
1716            });
1717        assert_eq!(version.get_object_ids(false).count(), 1);
1718
1719        // Add to non sub level
1720        version.levels.get_mut(&0).unwrap().levels.push(Level {
1721            table_infos: vec![
1722                SstableInfoInner {
1723                    object_id: 22.into(),
1724                    sst_id: 22.into(),
1725                    ..Default::default()
1726                }
1727                .into(),
1728            ],
1729            ..Default::default()
1730        });
1731        assert_eq!(version.get_object_ids(false).count(), 2);
1732    }
1733
1734    #[test]
1735    fn test_apply_version_delta() {
1736        let mut version = HummockVersion {
1737            id: HummockVersionId::new(0),
1738            levels: HashMap::from_iter([
1739                (
1740                    0,
1741                    build_initial_compaction_group_levels(
1742                        0,
1743                        &CompactionConfig {
1744                            max_level: 6,
1745                            ..Default::default()
1746                        },
1747                    ),
1748                ),
1749                (
1750                    1,
1751                    build_initial_compaction_group_levels(
1752                        1,
1753                        &CompactionConfig {
1754                            max_level: 6,
1755                            ..Default::default()
1756                        },
1757                    ),
1758                ),
1759            ]),
1760            ..Default::default()
1761        };
1762        let version_delta = HummockVersionDelta {
1763            id: HummockVersionId::new(1),
1764            group_deltas: HashMap::from_iter([
1765                (
1766                    2,
1767                    GroupDeltas {
1768                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1769                            group_config: Some(CompactionConfig {
1770                                max_level: 6,
1771                                ..Default::default()
1772                            }),
1773                            ..Default::default()
1774                        }))],
1775                    },
1776                ),
1777                (
1778                    0,
1779                    GroupDeltas {
1780                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1781                    },
1782                ),
1783                (
1784                    1,
1785                    GroupDeltas {
1786                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1787                            1,
1788                            0,
1789                            HashSet::new(),
1790                            vec![
1791                                SstableInfoInner {
1792                                    object_id: 1.into(),
1793                                    sst_id: 1.into(),
1794                                    ..Default::default()
1795                                }
1796                                .into(),
1797                            ],
1798                            0,
1799                            version
1800                                .levels
1801                                .get(&1)
1802                                .as_ref()
1803                                .unwrap()
1804                                .compaction_group_version_id,
1805                        ))],
1806                    },
1807                ),
1808            ]),
1809            ..Default::default()
1810        };
1811        let version_delta = version_delta;
1812
1813        version.apply_version_delta(&version_delta);
1814        let mut cg1 = build_initial_compaction_group_levels(
1815            1,
1816            &CompactionConfig {
1817                max_level: 6,
1818                ..Default::default()
1819            },
1820        );
1821        cg1.levels[0] = Level {
1822            level_idx: 1,
1823            level_type: LevelType::Nonoverlapping,
1824            table_infos: vec![
1825                SstableInfoInner {
1826                    object_id: 1.into(),
1827                    sst_id: 1.into(),
1828                    ..Default::default()
1829                }
1830                .into(),
1831            ],
1832            ..Default::default()
1833        };
1834        assert_eq!(
1835            version,
1836            HummockVersion {
1837                id: HummockVersionId::new(1),
1838                levels: HashMap::from_iter([
1839                    (
1840                        2,
1841                        build_initial_compaction_group_levels(
1842                            2,
1843                            &CompactionConfig {
1844                                max_level: 6,
1845                                ..Default::default()
1846                            },
1847                        ),
1848                    ),
1849                    (1, cg1),
1850                ]),
1851                ..Default::default()
1852            }
1853        );
1854    }
1855
1856    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1857        gen_sst_info_impl(object_id, table_ids, left, right).into()
1858    }
1859
1860    fn gen_sst_info_impl(
1861        object_id: u64,
1862        table_ids: Vec<u32>,
1863        left: Bytes,
1864        right: Bytes,
1865    ) -> SstableInfoInner {
1866        SstableInfoInner {
1867            object_id: object_id.into(),
1868            sst_id: object_id.into(),
1869            key_range: KeyRange {
1870                left,
1871                right,
1872                right_exclusive: false,
1873            },
1874            table_ids: table_ids.into_iter().map(Into::into).collect(),
1875            file_size: 100,
1876            sst_size: 100,
1877            uncompressed_file_size: 100,
1878            ..Default::default()
1879        }
1880    }
1881
1882    #[test]
1883    fn test_merge_levels() {
1884        let mut left_levels = build_initial_compaction_group_levels(
1885            1,
1886            &CompactionConfig {
1887                max_level: 6,
1888                ..Default::default()
1889            },
1890        );
1891
1892        let mut right_levels = build_initial_compaction_group_levels(
1893            2,
1894            &CompactionConfig {
1895                max_level: 6,
1896                ..Default::default()
1897            },
1898        );
1899
1900        left_levels.levels[0] = Level {
1901            level_idx: 1,
1902            level_type: LevelType::Nonoverlapping,
1903            table_infos: vec![
1904                gen_sst_info(
1905                    1,
1906                    vec![3],
1907                    FullKey::for_test(
1908                        TableId::new(3),
1909                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1910                        0,
1911                    )
1912                    .encode()
1913                    .into(),
1914                    FullKey::for_test(
1915                        TableId::new(3),
1916                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1917                        0,
1918                    )
1919                    .encode()
1920                    .into(),
1921                ),
1922                gen_sst_info(
1923                    10,
1924                    vec![3, 4],
1925                    FullKey::for_test(
1926                        TableId::new(3),
1927                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1928                        0,
1929                    )
1930                    .encode()
1931                    .into(),
1932                    FullKey::for_test(
1933                        TableId::new(4),
1934                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1935                        0,
1936                    )
1937                    .encode()
1938                    .into(),
1939                ),
1940                gen_sst_info(
1941                    11,
1942                    vec![4],
1943                    FullKey::for_test(
1944                        TableId::new(4),
1945                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1946                        0,
1947                    )
1948                    .encode()
1949                    .into(),
1950                    FullKey::for_test(
1951                        TableId::new(4),
1952                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1953                        0,
1954                    )
1955                    .encode()
1956                    .into(),
1957                ),
1958            ],
1959            total_file_size: 300,
1960            ..Default::default()
1961        };
1962
1963        left_levels.l0.sub_levels.push(Level {
1964            level_idx: 0,
1965            table_infos: vec![gen_sst_info(
1966                3,
1967                vec![3],
1968                FullKey::for_test(
1969                    TableId::new(3),
1970                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1971                    0,
1972                )
1973                .encode()
1974                .into(),
1975                FullKey::for_test(
1976                    TableId::new(3),
1977                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1978                    0,
1979                )
1980                .encode()
1981                .into(),
1982            )],
1983            sub_level_id: 101,
1984            level_type: LevelType::Overlapping,
1985            total_file_size: 100,
1986            ..Default::default()
1987        });
1988
1989        left_levels.l0.sub_levels.push(Level {
1990            level_idx: 0,
1991            table_infos: vec![gen_sst_info(
1992                3,
1993                vec![3],
1994                FullKey::for_test(
1995                    TableId::new(3),
1996                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1997                    0,
1998                )
1999                .encode()
2000                .into(),
2001                FullKey::for_test(
2002                    TableId::new(3),
2003                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2004                    0,
2005                )
2006                .encode()
2007                .into(),
2008            )],
2009            sub_level_id: 103,
2010            level_type: LevelType::Overlapping,
2011            total_file_size: 100,
2012            ..Default::default()
2013        });
2014
2015        left_levels.l0.sub_levels.push(Level {
2016            level_idx: 0,
2017            table_infos: vec![gen_sst_info(
2018                3,
2019                vec![3],
2020                FullKey::for_test(
2021                    TableId::new(3),
2022                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2023                    0,
2024                )
2025                .encode()
2026                .into(),
2027                FullKey::for_test(
2028                    TableId::new(3),
2029                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2030                    0,
2031                )
2032                .encode()
2033                .into(),
2034            )],
2035            sub_level_id: 105,
2036            level_type: LevelType::Nonoverlapping,
2037            total_file_size: 100,
2038            ..Default::default()
2039        });
2040
2041        right_levels.levels[0] = Level {
2042            level_idx: 1,
2043            level_type: LevelType::Nonoverlapping,
2044            table_infos: vec![
2045                gen_sst_info(
2046                    1,
2047                    vec![5],
2048                    FullKey::for_test(
2049                        TableId::new(5),
2050                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2051                        0,
2052                    )
2053                    .encode()
2054                    .into(),
2055                    FullKey::for_test(
2056                        TableId::new(5),
2057                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2058                        0,
2059                    )
2060                    .encode()
2061                    .into(),
2062                ),
2063                gen_sst_info(
2064                    10,
2065                    vec![5, 6],
2066                    FullKey::for_test(
2067                        TableId::new(5),
2068                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2069                        0,
2070                    )
2071                    .encode()
2072                    .into(),
2073                    FullKey::for_test(
2074                        TableId::new(6),
2075                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2076                        0,
2077                    )
2078                    .encode()
2079                    .into(),
2080                ),
2081                gen_sst_info(
2082                    11,
2083                    vec![6],
2084                    FullKey::for_test(
2085                        TableId::new(6),
2086                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2087                        0,
2088                    )
2089                    .encode()
2090                    .into(),
2091                    FullKey::for_test(
2092                        TableId::new(6),
2093                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2094                        0,
2095                    )
2096                    .encode()
2097                    .into(),
2098                ),
2099            ],
2100            total_file_size: 300,
2101            ..Default::default()
2102        };
2103
2104        right_levels.l0.sub_levels.push(Level {
2105            level_idx: 0,
2106            table_infos: vec![gen_sst_info(
2107                3,
2108                vec![5],
2109                FullKey::for_test(
2110                    TableId::new(5),
2111                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2112                    0,
2113                )
2114                .encode()
2115                .into(),
2116                FullKey::for_test(
2117                    TableId::new(5),
2118                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2119                    0,
2120                )
2121                .encode()
2122                .into(),
2123            )],
2124            sub_level_id: 101,
2125            level_type: LevelType::Overlapping,
2126            total_file_size: 100,
2127            ..Default::default()
2128        });
2129
2130        right_levels.l0.sub_levels.push(Level {
2131            level_idx: 0,
2132            table_infos: vec![gen_sst_info(
2133                5,
2134                vec![5],
2135                FullKey::for_test(
2136                    TableId::new(5),
2137                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2138                    0,
2139                )
2140                .encode()
2141                .into(),
2142                FullKey::for_test(
2143                    TableId::new(5),
2144                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2145                    0,
2146                )
2147                .encode()
2148                .into(),
2149            )],
2150            sub_level_id: 102,
2151            level_type: LevelType::Overlapping,
2152            total_file_size: 100,
2153            ..Default::default()
2154        });
2155
2156        right_levels.l0.sub_levels.push(Level {
2157            level_idx: 0,
2158            table_infos: vec![gen_sst_info(
2159                3,
2160                vec![5],
2161                FullKey::for_test(
2162                    TableId::new(5),
2163                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2164                    0,
2165                )
2166                .encode()
2167                .into(),
2168                FullKey::for_test(
2169                    TableId::new(5),
2170                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2171                    0,
2172                )
2173                .encode()
2174                .into(),
2175            )],
2176            sub_level_id: 103,
2177            level_type: LevelType::Nonoverlapping,
2178            total_file_size: 100,
2179            ..Default::default()
2180        });
2181
2182        {
2183            // test empty
2184            let mut left_levels = Levels::default();
2185            let right_levels = Levels::default();
2186
2187            group_split::merge_levels(&mut left_levels, right_levels);
2188        }
2189
2190        {
2191            // test empty left
2192            let mut left_levels = build_initial_compaction_group_levels(
2193                1,
2194                &CompactionConfig {
2195                    max_level: 6,
2196                    ..Default::default()
2197                },
2198            );
2199            let right_levels = right_levels.clone();
2200
2201            group_split::merge_levels(&mut left_levels, right_levels);
2202
2203            assert!(left_levels.l0.sub_levels.len() == 3);
2204            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2205            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2206            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2207            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2208            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2209            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2210
2211            assert!(left_levels.levels[0].level_idx == 1);
2212            assert_eq!(300, left_levels.levels[0].total_file_size);
2213        }
2214
2215        {
2216            // test empty right
2217            let mut left_levels = left_levels.clone();
2218            let right_levels = build_initial_compaction_group_levels(
2219                2,
2220                &CompactionConfig {
2221                    max_level: 6,
2222                    ..Default::default()
2223                },
2224            );
2225
2226            group_split::merge_levels(&mut left_levels, right_levels);
2227
2228            assert!(left_levels.l0.sub_levels.len() == 3);
2229            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2230            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2231            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2232            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2233            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2234            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2235
2236            assert!(left_levels.levels[0].level_idx == 1);
2237            assert_eq!(300, left_levels.levels[0].total_file_size);
2238        }
2239
2240        {
2241            let mut left_levels = left_levels.clone();
2242            let right_levels = right_levels.clone();
2243
2244            group_split::merge_levels(&mut left_levels, right_levels);
2245
2246            assert!(left_levels.l0.sub_levels.len() == 6);
2247            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2248            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2249            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2250            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2251            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2252            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2253            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2254            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2255            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2256            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2257            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2258            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2259
2260            assert!(left_levels.levels[0].level_idx == 1);
2261            assert_eq!(600, left_levels.levels[0].total_file_size);
2262        }
2263    }
2264
2265    #[test]
2266    fn test_get_split_pos() {
2267        let epoch = test_epoch(1);
2268        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2269        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2270        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2271
2272        let ssts = vec![s1, s2, s3];
2273        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2274
2275        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2276        assert_eq!(1, pos);
2277
2278        let pos = group_split::get_split_pos(&vec![], split_key);
2279        assert_eq!(0, pos);
2280    }
2281
2282    #[test]
2283    fn test_split_sst() {
2284        let epoch = test_epoch(1);
2285        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2286
2287        {
2288            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2289            let origin_sst = sst.clone();
2290            let sst_size = origin_sst.sst_size;
2291            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2292            assert_eq!(SstSplitType::Both, split_type);
2293
2294            let mut new_sst_id = 10.into();
2295            let (origin_sst, branched_sst) = group_split::split_sst(
2296                origin_sst,
2297                &mut new_sst_id,
2298                split_key,
2299                sst_size / 2,
2300                sst_size / 2,
2301            );
2302
2303            let origin_sst = origin_sst.unwrap();
2304            let branched_sst = branched_sst.unwrap();
2305
2306            assert!(origin_sst.key_range.right_exclusive);
2307            assert!(
2308                origin_sst
2309                    .key_range
2310                    .right
2311                    .cmp(&branched_sst.key_range.left)
2312                    .is_le()
2313            );
2314            assert!(origin_sst.table_ids.is_sorted());
2315            assert!(branched_sst.table_ids.is_sorted());
2316            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2317            assert!(branched_sst.sst_size < origin_sst.file_size);
2318            assert_eq!(10, branched_sst.sst_id);
2319            assert_eq!(11, origin_sst.sst_id);
2320            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2321        }
2322
2323        {
2324            // test un-exist table_id
2325            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2326            let origin_sst = sst.clone();
2327            let sst_size = origin_sst.sst_size;
2328            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2329            assert_eq!(SstSplitType::Both, split_type);
2330
2331            let mut new_sst_id = 10.into();
2332            let (origin_sst, branched_sst) = group_split::split_sst(
2333                origin_sst,
2334                &mut new_sst_id,
2335                split_key,
2336                sst_size / 2,
2337                sst_size / 2,
2338            );
2339
2340            let origin_sst = origin_sst.unwrap();
2341            let branched_sst = branched_sst.unwrap();
2342
2343            assert!(origin_sst.key_range.right_exclusive);
2344            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2345            assert!(origin_sst.table_ids.is_sorted());
2346            assert!(branched_sst.table_ids.is_sorted());
2347            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2348            assert!(branched_sst.sst_size < origin_sst.file_size);
2349            assert_eq!(10, branched_sst.sst_id);
2350            assert_eq!(11, origin_sst.sst_id);
2351            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2352        }
2353
2354        {
2355            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2356            let split_type = group_split::need_to_split(&sst, split_key);
2357            assert_eq!(SstSplitType::Left, split_type);
2358        }
2359
2360        {
2361            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2362            let origin_sst = sst.clone();
2363            let split_type = group_split::need_to_split(&origin_sst, split_key);
2364            assert_eq!(SstSplitType::Both, split_type);
2365
2366            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2367            let origin_sst = sst;
2368            let split_type = group_split::need_to_split(&origin_sst, split_key);
2369            assert_eq!(SstSplitType::Right, split_type);
2370        }
2371
2372        {
2373            // test key_range left = right
2374            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2375            sst.key_range.right = sst.key_range.left.clone();
2376            let sst: SstableInfo = sst.into();
2377            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2378            let origin_sst = sst;
2379            let sst_size = origin_sst.sst_size;
2380
2381            let mut new_sst_id = 10.into();
2382            let (origin_sst, branched_sst) = group_split::split_sst(
2383                origin_sst,
2384                &mut new_sst_id,
2385                split_key,
2386                sst_size / 2,
2387                sst_size / 2,
2388            );
2389
2390            assert!(origin_sst.is_none());
2391            assert!(branched_sst.is_some());
2392        }
2393    }
2394
2395    #[test]
2396    fn test_split_sst_info_for_level() {
2397        let mut version = HummockVersion {
2398            id: HummockVersionId(0),
2399            levels: HashMap::from_iter([(
2400                1,
2401                build_initial_compaction_group_levels(
2402                    1,
2403                    &CompactionConfig {
2404                        max_level: 6,
2405                        ..Default::default()
2406                    },
2407                ),
2408            )]),
2409            ..Default::default()
2410        };
2411
2412        let cg1 = version.levels.get_mut(&1).unwrap();
2413
2414        cg1.levels[0] = Level {
2415            level_idx: 1,
2416            level_type: LevelType::Nonoverlapping,
2417            table_infos: vec![
2418                gen_sst_info(
2419                    1,
2420                    vec![3],
2421                    FullKey::for_test(
2422                        TableId::new(3),
2423                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2424                        0,
2425                    )
2426                    .encode()
2427                    .into(),
2428                    FullKey::for_test(
2429                        TableId::new(3),
2430                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2431                        0,
2432                    )
2433                    .encode()
2434                    .into(),
2435                ),
2436                gen_sst_info(
2437                    10,
2438                    vec![3, 4],
2439                    FullKey::for_test(
2440                        TableId::new(3),
2441                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2442                        0,
2443                    )
2444                    .encode()
2445                    .into(),
2446                    FullKey::for_test(
2447                        TableId::new(4),
2448                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2449                        0,
2450                    )
2451                    .encode()
2452                    .into(),
2453                ),
2454                gen_sst_info(
2455                    11,
2456                    vec![4],
2457                    FullKey::for_test(
2458                        TableId::new(4),
2459                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2460                        0,
2461                    )
2462                    .encode()
2463                    .into(),
2464                    FullKey::for_test(
2465                        TableId::new(4),
2466                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2467                        0,
2468                    )
2469                    .encode()
2470                    .into(),
2471                ),
2472            ],
2473            total_file_size: 300,
2474            ..Default::default()
2475        };
2476
2477        cg1.l0.sub_levels.push(Level {
2478            level_idx: 0,
2479            table_infos: vec![
2480                gen_sst_info(
2481                    2,
2482                    vec![2],
2483                    FullKey::for_test(
2484                        TableId::new(0),
2485                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2486                        0,
2487                    )
2488                    .encode()
2489                    .into(),
2490                    FullKey::for_test(
2491                        TableId::new(2),
2492                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2493                        0,
2494                    )
2495                    .encode()
2496                    .into(),
2497                ),
2498                gen_sst_info(
2499                    22,
2500                    vec![2],
2501                    FullKey::for_test(
2502                        TableId::new(0),
2503                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2504                        0,
2505                    )
2506                    .encode()
2507                    .into(),
2508                    FullKey::for_test(
2509                        TableId::new(2),
2510                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2511                        0,
2512                    )
2513                    .encode()
2514                    .into(),
2515                ),
2516                gen_sst_info(
2517                    23,
2518                    vec![2],
2519                    FullKey::for_test(
2520                        TableId::new(0),
2521                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2522                        0,
2523                    )
2524                    .encode()
2525                    .into(),
2526                    FullKey::for_test(
2527                        TableId::new(2),
2528                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2529                        0,
2530                    )
2531                    .encode()
2532                    .into(),
2533                ),
2534                gen_sst_info(
2535                    24,
2536                    vec![2],
2537                    FullKey::for_test(
2538                        TableId::new(2),
2539                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2540                        0,
2541                    )
2542                    .encode()
2543                    .into(),
2544                    FullKey::for_test(
2545                        TableId::new(2),
2546                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2547                        0,
2548                    )
2549                    .encode()
2550                    .into(),
2551                ),
2552                gen_sst_info(
2553                    25,
2554                    vec![2],
2555                    FullKey::for_test(
2556                        TableId::new(0),
2557                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2558                        0,
2559                    )
2560                    .encode()
2561                    .into(),
2562                    FullKey::for_test(
2563                        TableId::new(0),
2564                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2565                        0,
2566                    )
2567                    .encode()
2568                    .into(),
2569                ),
2570            ],
2571            sub_level_id: 101,
2572            level_type: LevelType::Overlapping,
2573            total_file_size: 300,
2574            ..Default::default()
2575        });
2576
2577        {
2578            // split Overlapping level
2579            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2580
2581            let mut new_sst_id = 100.into();
2582            let x = group_split::split_sst_info_for_level_v2(
2583                &mut cg1.l0.sub_levels[0],
2584                &mut new_sst_id,
2585                split_key,
2586            );
2587            // assert_eq!(3, x.len());
2588            // assert_eq!(100, x[0].sst_id);
2589            // assert_eq!(100, x[0].sst_size);
2590            // assert_eq!(101, x[1].sst_id);
2591            // assert_eq!(100, x[1].sst_size);
2592            // assert_eq!(102, x[2].sst_id);
2593            // assert_eq!(100, x[2].sst_size);
2594
2595            let mut right_l0 = OverlappingLevel {
2596                sub_levels: vec![],
2597                total_file_size: 0,
2598                uncompressed_file_size: 0,
2599            };
2600
2601            right_l0.sub_levels.push(Level {
2602                level_idx: 0,
2603                table_infos: x,
2604                sub_level_id: 101,
2605                total_file_size: 100,
2606                level_type: LevelType::Overlapping,
2607                ..Default::default()
2608            });
2609
2610            let right_levels = Levels {
2611                levels: vec![],
2612                l0: right_l0,
2613                ..Default::default()
2614            };
2615
2616            merge_levels(cg1, right_levels);
2617        }
2618
2619        {
2620            // test split empty level
2621            let mut new_sst_id = 100.into();
2622            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2623            let x = group_split::split_sst_info_for_level_v2(
2624                &mut cg1.levels[2],
2625                &mut new_sst_id,
2626                split_key,
2627            );
2628
2629            assert!(x.is_empty());
2630        }
2631
2632        {
2633            // test split to right Nonoverlapping level
2634            let mut cg1 = cg1.clone();
2635            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2636
2637            let mut new_sst_id = 100.into();
2638            let x = group_split::split_sst_info_for_level_v2(
2639                &mut cg1.levels[0],
2640                &mut new_sst_id,
2641                split_key,
2642            );
2643
2644            assert_eq!(3, x.len());
2645            assert_eq!(1, x[0].sst_id);
2646            assert_eq!(100, x[0].sst_size);
2647            assert_eq!(10, x[1].sst_id);
2648            assert_eq!(100, x[1].sst_size);
2649            assert_eq!(11, x[2].sst_id);
2650            assert_eq!(100, x[2].sst_size);
2651
2652            assert_eq!(0, cg1.levels[0].table_infos.len());
2653        }
2654
2655        {
2656            // test split to left Nonoverlapping level
2657            let mut cg1 = cg1.clone();
2658            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2659
2660            let mut new_sst_id = 100.into();
2661            let x = group_split::split_sst_info_for_level_v2(
2662                &mut cg1.levels[0],
2663                &mut new_sst_id,
2664                split_key,
2665            );
2666
2667            assert_eq!(0, x.len());
2668            assert_eq!(3, cg1.levels[0].table_infos.len());
2669        }
2670
2671        // {
2672        //     // test split to both Nonoverlapping level
2673        //     let mut cg1 = cg1.clone();
2674        //     let split_key = build_split_key(3, VirtualNode::MAX);
2675
2676        //     let mut new_sst_id = 100;
2677        //     let x = group_split::split_sst_info_for_level_v2(
2678        //         &mut cg1.levels[0],
2679        //         &mut new_sst_id,
2680        //         split_key,
2681        //     );
2682
2683        //     assert_eq!(2, x.len());
2684        //     assert_eq!(100, x[0].sst_id);
2685        //     assert_eq!(100 / 2, x[0].sst_size);
2686        //     assert_eq!(11, x[1].sst_id);
2687        //     assert_eq!(100, x[1].sst_size);
2688        //     assert_eq!(vec![3, 4], x[0].table_ids);
2689
2690        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2691        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2692        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2693        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2694        // }
2695
2696        {
2697            // test split to both Nonoverlapping level
2698            let mut cg1 = cg1.clone();
2699            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2700
2701            let mut new_sst_id = 100.into();
2702            let x = group_split::split_sst_info_for_level_v2(
2703                &mut cg1.levels[0],
2704                &mut new_sst_id,
2705                split_key,
2706            );
2707
2708            assert_eq!(2, x.len());
2709            assert_eq!(100, x[0].sst_id);
2710            assert_eq!(100 / 2, x[0].sst_size);
2711            assert_eq!(11, x[1].sst_id);
2712            assert_eq!(100, x[1].sst_size);
2713            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2714
2715            assert_eq!(2, cg1.levels[0].table_infos.len());
2716            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2717            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2718            assert_eq!(
2719                vec![TableId::new(3)],
2720                cg1.levels[0].table_infos[1].table_ids
2721            );
2722        }
2723    }
2724}