risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51    pub insert_sst_level: u32,
52    pub insert_sst_infos: Vec<SstableInfo>,
53    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60        self.levels
61            .get(&compaction_group_id)
62            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63    }
64
65    pub fn get_compaction_group_levels_mut(
66        &mut self,
67        compaction_group_id: CompactionGroupId,
68    ) -> &mut Levels {
69        self.levels
70            .get_mut(&compaction_group_id)
71            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72    }
73
74    // only scan the sst infos from levels in the specified compaction group (without table change log)
75    pub fn get_sst_ids_by_group_id(
76        &self,
77        compaction_group_id: CompactionGroupId,
78    ) -> impl Iterator<Item = HummockSstableId> + '_ {
79        self.levels
80            .iter()
81            .filter_map(move |(cg_id, level)| {
82                if *cg_id == compaction_group_id {
83                    Some(level)
84                } else {
85                    None
86                }
87            })
88            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89            .flat_map(|level| level.table_infos.iter())
90            .map(|s| s.sst_id)
91    }
92
93    pub fn level_iter<F: FnMut(&Level) -> bool>(
94        &self,
95        compaction_group_id: CompactionGroupId,
96        mut f: F,
97    ) {
98        if let Some(levels) = self.levels.get(&compaction_group_id) {
99            for sub_level in &levels.l0.sub_levels {
100                if !f(sub_level) {
101                    return;
102                }
103            }
104            for level in &levels.levels {
105                if !f(level) {
106                    return;
107                }
108            }
109        }
110    }
111
112    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113        // l0 is currently separated from all levels
114        self.levels
115            .get(&compaction_group_id)
116            .map(|group| group.levels.len() + 1)
117            .unwrap_or(0)
118    }
119
120    pub fn safe_epoch_table_watermarks(
121        &self,
122        existing_table_ids: &[TableId],
123    ) -> BTreeMap<TableId, TableWatermarks> {
124        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125    }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130    existing_table_ids: &[TableId],
131) -> BTreeMap<TableId, TableWatermarks> {
132    fn extract_single_table_watermark(
133        table_watermarks: &TableWatermarks,
134    ) -> Option<TableWatermarks> {
135        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136            Some(TableWatermarks {
137                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138                direction: table_watermarks.direction,
139                watermark_type: table_watermarks.watermark_type,
140            })
141        } else {
142            None
143        }
144    }
145    table_watermarks
146        .iter()
147        .filter_map(|(table_id, table_watermarks)| {
148            if !existing_table_ids.contains(table_id) {
149                None
150            } else {
151                extract_single_table_watermark(table_watermarks)
152                    .map(|table_watermarks| (*table_id, table_watermarks))
153            }
154        })
155        .collect()
156}
157
158pub fn safe_epoch_read_table_watermarks_impl(
159    safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
160) -> BTreeMap<TableId, ReadTableWatermark> {
161    safe_epoch_watermarks
162        .into_iter()
163        .map(|(table_id, watermarks)| {
164            assert_eq!(watermarks.watermarks.len(), 1);
165            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
166            let mut vnode_watermark_map = BTreeMap::new();
167            for vnode_watermark in vnode_watermarks.iter() {
168                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
169                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
170                    assert!(
171                        vnode_watermark_map
172                            .insert(vnode, watermark.clone())
173                            .is_none(),
174                        "duplicate table watermark on vnode {}",
175                        vnode.to_index()
176                    );
177                }
178            }
179            (
180                table_id,
181                ReadTableWatermark {
182                    direction: watermarks.direction,
183                    vnode_watermarks: vnode_watermark_map,
184                },
185            )
186        })
187        .collect()
188}
189
190impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
191    pub fn count_new_ssts_in_group_split(
192        &self,
193        parent_group_id: CompactionGroupId,
194        split_key: Bytes,
195    ) -> u64 {
196        self.levels
197            .get(&parent_group_id)
198            .map_or(0, |parent_levels| {
199                let l0 = &parent_levels.l0;
200                let mut split_count = 0;
201                for sub_level in &l0.sub_levels {
202                    assert!(!sub_level.table_infos.is_empty());
203
204                    if sub_level.level_type == PbLevelType::Overlapping {
205                        // TODO: use table_id / vnode / key_range filter
206                        split_count += sub_level
207                            .table_infos
208                            .iter()
209                            .map(|sst| {
210                                if let group_split::SstSplitType::Both =
211                                    group_split::need_to_split(sst, split_key.clone())
212                                {
213                                    2
214                                } else {
215                                    0
216                                }
217                            })
218                            .sum::<u64>();
219                        continue;
220                    }
221
222                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
223                    let sst = sub_level.table_infos.get(pos).unwrap();
224
225                    if let group_split::SstSplitType::Both =
226                        group_split::need_to_split(sst, split_key.clone())
227                    {
228                        split_count += 2;
229                    }
230                }
231
232                for level in &parent_levels.levels {
233                    if level.table_infos.is_empty() {
234                        continue;
235                    }
236                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
237                    let sst = level.table_infos.get(pos).unwrap();
238                    if let group_split::SstSplitType::Both =
239                        group_split::need_to_split(sst, split_key.clone())
240                    {
241                        split_count += 2;
242                    }
243                }
244
245                split_count
246            })
247    }
248
249    pub fn init_with_parent_group(
250        &mut self,
251        parent_group_id: CompactionGroupId,
252        group_id: CompactionGroupId,
253        member_table_ids: BTreeSet<StateTableId>,
254        new_sst_start_id: HummockSstableId,
255    ) {
256        let mut new_sst_id = new_sst_start_id;
257        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
258            if new_sst_start_id != 0 {
259                if cfg!(debug_assertions) {
260                    panic!(
261                        "non-zero sst start id {} for NewCompactionGroup",
262                        new_sst_start_id
263                    );
264                } else {
265                    warn!(
266                        %new_sst_start_id,
267                        "non-zero sst start id for NewCompactionGroup"
268                    );
269                }
270            }
271            return;
272        } else if !self.levels.contains_key(&parent_group_id) {
273            unreachable!(
274                "non-existing parent group id {} to init from",
275                parent_group_id
276            );
277        }
278        let [parent_levels, cur_levels] = self
279            .levels
280            .get_disjoint_mut([&parent_group_id, &group_id])
281            .map(|res| res.unwrap());
282        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
283        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
284        parent_levels.compaction_group_version_id += 1;
285        cur_levels.compaction_group_version_id += 1;
286        let l0 = &mut parent_levels.l0;
287        {
288            for sub_level in &mut l0.sub_levels {
289                let target_l0 = &mut cur_levels.l0;
290                // Remove SST from sub level may result in empty sub level. It will be purged
291                // whenever another compaction task is finished.
292                let insert_table_infos =
293                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
294                sub_level
295                    .table_infos
296                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
297                    .for_each(|sst_info| {
298                        sub_level.total_file_size -= sst_info.sst_size;
299                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
300                        l0.total_file_size -= sst_info.sst_size;
301                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
302                    });
303                if insert_table_infos.is_empty() {
304                    continue;
305                }
306                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
307                    Ok(idx) => {
308                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
309                    }
310                    Err(idx) => {
311                        insert_new_sub_level(
312                            target_l0,
313                            sub_level.sub_level_id,
314                            sub_level.level_type,
315                            insert_table_infos,
316                            Some(idx),
317                        );
318                    }
319                }
320            }
321
322            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
323        }
324        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
325            let insert_table_infos =
326                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
327            cur_levels.levels[idx].total_file_size += insert_table_infos
328                .iter()
329                .map(|sst| sst.sst_size)
330                .sum::<u64>();
331            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
332                .iter()
333                .map(|sst| sst.uncompressed_file_size)
334                .sum::<u64>();
335            cur_levels.levels[idx]
336                .table_infos
337                .extend(insert_table_infos);
338            cur_levels.levels[idx]
339                .table_infos
340                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
341            assert!(can_concat(&cur_levels.levels[idx].table_infos));
342            level
343                .table_infos
344                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
345                .for_each(|sst_info| {
346                    level.total_file_size -= sst_info.sst_size;
347                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
348                });
349        }
350
351        assert!(
352            parent_levels
353                .l0
354                .sub_levels
355                .iter()
356                .all(|level| !level.table_infos.is_empty())
357        );
358        assert!(
359            cur_levels
360                .l0
361                .sub_levels
362                .iter()
363                .all(|level| !level.table_infos.is_empty())
364        );
365    }
366
367    pub fn build_sst_delta_infos(
368        &self,
369        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
370    ) -> Vec<SstDeltaInfo> {
371        let mut infos = vec![];
372
373        // Skip trivial move delta for refiller
374        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
375        if version_delta.trivial_move {
376            return infos;
377        }
378
379        for (group_id, group_deltas) in &version_delta.group_deltas {
380            let mut info = SstDeltaInfo::default();
381
382            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
383            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
384
385            // Build only if all deltas are intra level deltas.
386            if !group_deltas.group_deltas.iter().all(|delta| {
387                matches!(
388                    delta,
389                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
390                )
391            }) {
392                continue;
393            }
394
395            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
396            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
397            // struct to reduce conventions.
398            for group_delta in &group_deltas.group_deltas {
399                match group_delta {
400                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
401                        if !inserted_table_infos.is_empty() {
402                            info.insert_sst_level = 0;
403                            info.insert_sst_infos
404                                .extend(inserted_table_infos.iter().cloned());
405                        }
406                    }
407                    GroupDeltaCommon::IntraLevel(intra_level) => {
408                        if !intra_level.inserted_table_infos.is_empty() {
409                            info.insert_sst_level = intra_level.level_idx;
410                            info.insert_sst_infos
411                                .extend(intra_level.inserted_table_infos.iter().cloned());
412                        }
413                        if !intra_level.removed_table_ids.is_empty() {
414                            for id in &intra_level.removed_table_ids {
415                                if intra_level.level_idx == 0 {
416                                    removed_l0_ssts.insert(*id);
417                                } else {
418                                    removed_ssts
419                                        .entry(intra_level.level_idx)
420                                        .or_default()
421                                        .insert(*id);
422                                }
423                            }
424                        }
425                    }
426                    GroupDeltaCommon::GroupConstruct(_)
427                    | GroupDeltaCommon::GroupDestroy(_)
428                    | GroupDeltaCommon::GroupMerge(_)
429                    | GroupDeltaCommon::TruncateTables(_) => {}
430                }
431            }
432
433            let group = self.levels.get(group_id).unwrap();
434            for l0_sub_level in &group.level0().sub_levels {
435                for sst_info in &l0_sub_level.table_infos {
436                    if removed_l0_ssts.remove(&sst_info.sst_id) {
437                        info.delete_sst_object_ids.push(sst_info.object_id);
438                    }
439                }
440            }
441            for level in &group.levels {
442                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443                    for sst_info in &level.table_infos {
444                        if removed_level_ssts.remove(&sst_info.sst_id) {
445                            info.delete_sst_object_ids.push(sst_info.object_id);
446                        }
447                    }
448                    if !removed_level_ssts.is_empty() {
449                        tracing::error!(
450                            "removed_level_ssts is not empty: {:?}",
451                            removed_level_ssts,
452                        );
453                    }
454                    debug_assert!(removed_level_ssts.is_empty());
455                }
456            }
457
458            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459                tracing::error!(
460                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461                    removed_l0_ssts,
462                    removed_ssts
463                );
464            }
465            debug_assert!(removed_l0_ssts.is_empty());
466            debug_assert!(removed_ssts.is_empty());
467
468            infos.push(info);
469        }
470
471        infos
472    }
473
474    pub fn apply_version_delta(
475        &mut self,
476        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477    ) {
478        assert_eq!(self.id, version_delta.prev_id);
479
480        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481            &version_delta.state_table_info_delta,
482            &version_delta.removed_table_ids,
483        );
484
485        #[expect(deprecated)]
486        {
487            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488                is_commit_epoch = true;
489                tracing::trace!(
490                    "max committed epoch bumped but no table committed epoch is changed"
491                );
492            }
493        }
494
495        // apply to `levels`, which is different compaction groups
496        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497            let mut is_applied_l0_compact = false;
498            for group_delta in &group_deltas.group_deltas {
499                match group_delta {
500                    GroupDeltaCommon::GroupConstruct(group_construct) => {
501                        let mut new_levels = build_initial_compaction_group_levels(
502                            *compaction_group_id,
503                            group_construct.get_group_config().unwrap(),
504                        );
505                        let parent_group_id = group_construct.parent_group_id;
506                        new_levels.parent_group_id = parent_group_id;
507                        #[expect(deprecated)]
508                        // for backward-compatibility of previous hummock version delta
509                        new_levels
510                            .member_table_ids
511                            .clone_from(&group_construct.table_ids);
512                        self.levels.insert(*compaction_group_id, new_levels);
513                        let member_table_ids = if group_construct.version()
514                            >= CompatibilityVersion::NoMemberTableIds
515                        {
516                            self.state_table_info
517                                .compaction_group_member_table_ids(*compaction_group_id)
518                                .iter()
519                                .copied()
520                                .collect()
521                        } else {
522                            #[expect(deprecated)]
523                            // for backward-compatibility of previous hummock version delta
524                            BTreeSet::from_iter(
525                                group_construct.table_ids.iter().copied().map(Into::into),
526                            )
527                        };
528
529                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
530                            let split_key = if group_construct.split_key.is_some() {
531                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
532                            } else {
533                                None
534                            };
535                            self.init_with_parent_group_v2(
536                                parent_group_id,
537                                *compaction_group_id,
538                                group_construct.get_new_sst_start_id().into(),
539                                split_key.clone(),
540                            );
541                        } else {
542                            // for backward-compatibility of previous hummock version delta
543                            self.init_with_parent_group(
544                                parent_group_id,
545                                *compaction_group_id,
546                                member_table_ids,
547                                group_construct.get_new_sst_start_id().into(),
548                            );
549                        }
550                    }
551                    GroupDeltaCommon::GroupMerge(group_merge) => {
552                        tracing::info!(
553                            "group_merge left {:?} right {:?}",
554                            group_merge.left_group_id,
555                            group_merge.right_group_id
556                        );
557                        self.merge_compaction_group(
558                            group_merge.left_group_id,
559                            group_merge.right_group_id,
560                        )
561                    }
562                    GroupDeltaCommon::IntraLevel(level_delta) => {
563                        let levels =
564                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
565                                panic!("compaction group {} does not exist", compaction_group_id)
566                            });
567                        if is_commit_epoch {
568                            assert!(
569                                level_delta.removed_table_ids.is_empty(),
570                                "no sst should be deleted when committing an epoch"
571                            );
572
573                            let IntraLevelDelta {
574                                level_idx,
575                                l0_sub_level_id,
576                                inserted_table_infos,
577                                ..
578                            } = level_delta;
579                            {
580                                assert_eq!(
581                                    *level_idx, 0,
582                                    "we should only add to L0 when we commit an epoch."
583                                );
584                                if !inserted_table_infos.is_empty() {
585                                    insert_new_sub_level(
586                                        &mut levels.l0,
587                                        *l0_sub_level_id,
588                                        PbLevelType::Overlapping,
589                                        inserted_table_infos.clone(),
590                                        None,
591                                    );
592                                }
593                            }
594                        } else {
595                            // The delta is caused by compaction.
596                            levels.apply_compact_ssts(
597                                level_delta,
598                                self.state_table_info
599                                    .compaction_group_member_table_ids(*compaction_group_id),
600                            );
601                            if level_delta.level_idx == 0 {
602                                is_applied_l0_compact = true;
603                            }
604                        }
605                    }
606                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
607                        let levels =
608                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
609                                panic!("compaction group {} does not exist", compaction_group_id)
610                            });
611                        assert!(is_commit_epoch);
612
613                        if !inserted_table_infos.is_empty() {
614                            let next_l0_sub_level_id = levels
615                                .l0
616                                .sub_levels
617                                .last()
618                                .map(|level| level.sub_level_id + 1)
619                                .unwrap_or(1);
620
621                            insert_new_sub_level(
622                                &mut levels.l0,
623                                next_l0_sub_level_id,
624                                PbLevelType::Overlapping,
625                                inserted_table_infos.clone(),
626                                None,
627                            );
628                        }
629                    }
630                    GroupDeltaCommon::GroupDestroy(_) => {
631                        self.levels.remove(compaction_group_id);
632                    }
633
634                    GroupDeltaCommon::TruncateTables(table_ids) => {
635                        // iterate all levels and truncate the specified tables with the given table ids
636                        let levels =
637                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
638                                panic!("compaction group {} does not exist", compaction_group_id)
639                            });
640
641                        for sub_level in &mut levels.l0.sub_levels {
642                            sub_level.table_infos.iter_mut().for_each(|sstable_info| {
643                                let mut inner = sstable_info.get_inner();
644                                inner.table_ids.retain(|id| !table_ids.contains(id));
645                                sstable_info.set_inner(inner);
646                            });
647                        }
648
649                        for level in &mut levels.levels {
650                            level.table_infos.iter_mut().for_each(|sstable_info| {
651                                let mut inner = sstable_info.get_inner();
652                                inner.table_ids.retain(|id| !table_ids.contains(id));
653                                sstable_info.set_inner(inner);
654                            });
655                        }
656
657                        levels.compaction_group_version_id += 1;
658                    }
659                }
660            }
661
662            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
663            {
664                levels.post_apply_l0_compact();
665            }
666        }
667        self.id = version_delta.id;
668        #[expect(deprecated)]
669        {
670            self.max_committed_epoch = version_delta.max_committed_epoch;
671        }
672
673        // apply to table watermark
674
675        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
676        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
677            HashMap::new();
678
679        // apply to table watermark
680        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
681            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
682                if version_delta.removed_table_ids.contains(table_id) {
683                    modified_table_watermarks.insert(*table_id, None);
684                } else {
685                    let mut current_table_watermarks = (**current_table_watermarks).clone();
686                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
687                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
688                }
689            } else {
690                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
691            }
692        }
693        for (table_id, table_watermarks) in &self.table_watermarks {
694            let safe_epoch = if let Some(state_table_info) =
695                self.state_table_info.info().get(table_id)
696                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
697                && state_table_info.committed_epoch > *oldest_epoch
698            {
699                // safe epoch has progressed, need further clear.
700                state_table_info.committed_epoch
701            } else {
702                // safe epoch not progressed or the table has been removed. No need to truncate
703                continue;
704            };
705            let table_watermarks = modified_table_watermarks
706                .entry(*table_id)
707                .or_insert_with(|| Some((**table_watermarks).clone()));
708            if let Some(table_watermarks) = table_watermarks {
709                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
710            }
711        }
712        // apply the staging table watermark to hummock version
713        for (table_id, table_watermarks) in modified_table_watermarks {
714            if let Some(table_watermarks) = table_watermarks {
715                self.table_watermarks
716                    .insert(table_id, Arc::new(table_watermarks));
717            } else {
718                self.table_watermarks.remove(&table_id);
719            }
720        }
721
722        // apply to table change log
723        Self::apply_change_log_delta(
724            &mut self.table_change_log,
725            &version_delta.change_log_delta,
726            &version_delta.removed_table_ids,
727            &version_delta.state_table_info_delta,
728            &changed_table_info,
729        );
730
731        // apply to vector index
732        apply_vector_index_delta(
733            &mut self.vector_indexes,
734            &version_delta.vector_index_delta,
735            &version_delta.removed_table_ids,
736        );
737    }
738
739    pub fn apply_change_log_delta<T: Clone>(
740        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
741        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
742        removed_table_ids: &HashSet<TableId>,
743        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
744        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
745    ) {
746        for (table_id, change_log_delta) in change_log_delta {
747            let new_change_log = &change_log_delta.new_log;
748            match table_change_log.entry(*table_id) {
749                Entry::Occupied(entry) => {
750                    let change_log = entry.into_mut();
751                    change_log.add_change_log(new_change_log.clone());
752                }
753                Entry::Vacant(entry) => {
754                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
755                }
756            };
757        }
758
759        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
760        // the change log for the table, and then we will remove the table change log.
761        // The table change log will also be removed when the table id is removed.
762        table_change_log.retain(|table_id, _| {
763            if removed_table_ids.contains(table_id) {
764                return false;
765            }
766            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
767                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
768                // the table exists previously, and its committed epoch has progressed.
769            } else {
770                // otherwise, the table change log should be kept anyway
771                return true;
772            }
773            let contains = change_log_delta.contains_key(table_id);
774            if !contains {
775                warn!(
776                        ?table_id,
777                        "table change log dropped due to no further change log at newly committed epoch",
778                    );
779            }
780            contains
781        });
782
783        // truncate the remaining table change log
784        for (table_id, change_log_delta) in change_log_delta {
785            if let Some(change_log) = table_change_log.get_mut(table_id) {
786                change_log.truncate(change_log_delta.truncate_epoch);
787            }
788        }
789    }
790
791    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
792        let mut ret: BTreeMap<_, _> = BTreeMap::new();
793        for (compaction_group_id, group) in &self.levels {
794            let mut levels = vec![];
795            levels.extend(group.l0.sub_levels.iter());
796            levels.extend(group.levels.iter());
797            for level in levels {
798                for table_info in &level.table_infos {
799                    if table_info.sst_id.inner() == table_info.object_id.inner() {
800                        continue;
801                    }
802                    let object_id = table_info.object_id;
803                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
804                    entry
805                        .entry(*compaction_group_id)
806                        .or_default()
807                        .push(table_info.sst_id)
808                }
809            }
810        }
811        ret
812    }
813
814    pub fn merge_compaction_group(
815        &mut self,
816        left_group_id: CompactionGroupId,
817        right_group_id: CompactionGroupId,
818    ) {
819        // Double check
820        let left_group_id_table_ids = self
821            .state_table_info
822            .compaction_group_member_table_ids(left_group_id)
823            .iter();
824        let right_group_id_table_ids = self
825            .state_table_info
826            .compaction_group_member_table_ids(right_group_id)
827            .iter();
828
829        assert!(
830            left_group_id_table_ids
831                .chain(right_group_id_table_ids)
832                .is_sorted()
833        );
834
835        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
836        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
837            panic!(
838                "compaction group should exist right {} all {:?}",
839                right_group_id, total_cg
840            )
841        });
842
843        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
844            panic!(
845                "compaction group should exist left {} all {:?}",
846                left_group_id, total_cg
847            )
848        });
849
850        group_split::merge_levels(left_levels, right_levels);
851    }
852
853    pub fn init_with_parent_group_v2(
854        &mut self,
855        parent_group_id: CompactionGroupId,
856        group_id: CompactionGroupId,
857        new_sst_start_id: HummockSstableId,
858        split_key: Option<Bytes>,
859    ) {
860        let mut new_sst_id = new_sst_start_id;
861        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
862            if new_sst_start_id != 0 {
863                if cfg!(debug_assertions) {
864                    panic!(
865                        "non-zero sst start id {} for NewCompactionGroup",
866                        new_sst_start_id
867                    );
868                } else {
869                    warn!(
870                        %new_sst_start_id,
871                        "non-zero sst start id for NewCompactionGroup"
872                    );
873                }
874            }
875            return;
876        } else if !self.levels.contains_key(&parent_group_id) {
877            unreachable!(
878                "non-existing parent group id {} to init from (V2)",
879                parent_group_id
880            );
881        }
882
883        let [parent_levels, cur_levels] = self
884            .levels
885            .get_disjoint_mut([&parent_group_id, &group_id])
886            .map(|res| res.unwrap());
887        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
888        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
889        parent_levels.compaction_group_version_id += 1;
890        cur_levels.compaction_group_version_id += 1;
891
892        let l0 = &mut parent_levels.l0;
893        {
894            for sub_level in &mut l0.sub_levels {
895                let target_l0 = &mut cur_levels.l0;
896                // Remove SST from sub level may result in empty sub level. It will be purged
897                // whenever another compaction task is finished.
898                let insert_table_infos = if let Some(split_key) = &split_key {
899                    group_split::split_sst_info_for_level_v2(
900                        sub_level,
901                        &mut new_sst_id,
902                        split_key.clone(),
903                    )
904                } else {
905                    vec![]
906                };
907
908                if insert_table_infos.is_empty() {
909                    continue;
910                }
911
912                sub_level
913                    .table_infos
914                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
915                    .for_each(|sst_info| {
916                        sub_level.total_file_size -= sst_info.sst_size;
917                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
918                        l0.total_file_size -= sst_info.sst_size;
919                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
920                    });
921                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
922                    Ok(idx) => {
923                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
924                    }
925                    Err(idx) => {
926                        insert_new_sub_level(
927                            target_l0,
928                            sub_level.sub_level_id,
929                            sub_level.level_type,
930                            insert_table_infos,
931                            Some(idx),
932                        );
933                    }
934                }
935            }
936
937            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
938        }
939
940        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
941            let insert_table_infos = if let Some(split_key) = &split_key {
942                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
943            } else {
944                vec![]
945            };
946
947            if insert_table_infos.is_empty() {
948                continue;
949            }
950
951            cur_levels.levels[idx].total_file_size += insert_table_infos
952                .iter()
953                .map(|sst| sst.sst_size)
954                .sum::<u64>();
955            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
956                .iter()
957                .map(|sst| sst.uncompressed_file_size)
958                .sum::<u64>();
959            cur_levels.levels[idx]
960                .table_infos
961                .extend(insert_table_infos);
962            cur_levels.levels[idx]
963                .table_infos
964                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
965            assert!(can_concat(&cur_levels.levels[idx].table_infos));
966            level
967                .table_infos
968                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
969                .for_each(|sst_info| {
970                    level.total_file_size -= sst_info.sst_size;
971                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
972                });
973        }
974
975        assert!(
976            parent_levels
977                .l0
978                .sub_levels
979                .iter()
980                .all(|level| !level.table_infos.is_empty())
981        );
982        assert!(
983            cur_levels
984                .l0
985                .sub_levels
986                .iter()
987                .all(|level| !level.table_infos.is_empty())
988        );
989    }
990}
991
992impl<T> HummockVersionCommon<T>
993where
994    T: SstableIdReader + ObjectIdReader,
995{
996    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
997        self.get_sst_infos(exclude_change_log)
998            .map(|s| s.object_id())
999            .collect()
1000    }
1001
1002    pub fn get_object_ids(
1003        &self,
1004        exclude_change_log: bool,
1005    ) -> impl Iterator<Item = HummockObjectId> + '_ {
1006        // DO NOT REMOVE THIS LINE
1007        // This is to ensure that when adding new variant to `HummockObjectId`,
1008        // the compiler will warn us if we forget to handle it here.
1009        match HummockObjectId::Sstable(0.into()) {
1010            HummockObjectId::Sstable(_) => {}
1011            HummockObjectId::VectorFile(_) => {}
1012            HummockObjectId::HnswGraphFile(_) => {}
1013        };
1014        self.get_sst_infos(exclude_change_log)
1015            .map(|s| HummockObjectId::Sstable(s.object_id()))
1016            .chain(
1017                self.vector_indexes
1018                    .values()
1019                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1020            )
1021    }
1022
1023    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
1024        self.get_sst_infos(exclude_change_log)
1025            .map(|s| s.sst_id())
1026            .collect()
1027    }
1028
1029    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1030        let may_table_change_log = if exclude_change_log {
1031            None
1032        } else {
1033            Some(self.table_change_log.values())
1034        };
1035        self.get_combined_levels()
1036            .flat_map(|level| level.table_infos.iter())
1037            .chain(
1038                may_table_change_log
1039                    .map(|v| {
1040                        v.flat_map(|table_change_log| {
1041                            table_change_log.iter().flat_map(|epoch_change_log| {
1042                                epoch_change_log
1043                                    .old_value
1044                                    .iter()
1045                                    .chain(epoch_change_log.new_value.iter())
1046                            })
1047                        })
1048                    })
1049                    .into_iter()
1050                    .flatten(),
1051            )
1052    }
1053}
1054
1055impl Levels {
1056    pub(crate) fn apply_compact_ssts(
1057        &mut self,
1058        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1059        member_table_ids: &BTreeSet<TableId>,
1060    ) {
1061        let IntraLevelDeltaCommon {
1062            level_idx,
1063            l0_sub_level_id,
1064            inserted_table_infos: insert_table_infos,
1065            vnode_partition_count,
1066            removed_table_ids: delete_sst_ids_set,
1067            compaction_group_version_id,
1068        } = level_delta;
1069        let new_vnode_partition_count = *vnode_partition_count;
1070
1071        if is_compaction_task_expired(
1072            self.compaction_group_version_id,
1073            *compaction_group_version_id,
1074        ) {
1075            warn!(
1076                current_compaction_group_version_id = self.compaction_group_version_id,
1077                delta_compaction_group_version_id = compaction_group_version_id,
1078                level_idx,
1079                l0_sub_level_id,
1080                insert_table_infos = ?insert_table_infos
1081                    .iter()
1082                    .map(|sst| (sst.sst_id, sst.object_id))
1083                    .collect_vec(),
1084                ?delete_sst_ids_set,
1085                "This VersionDelta may be committed by an expired compact task. Please check it."
1086            );
1087            return;
1088        }
1089        if !delete_sst_ids_set.is_empty() {
1090            if *level_idx == 0 {
1091                for level in &mut self.l0.sub_levels {
1092                    level_delete_ssts(level, delete_sst_ids_set);
1093                }
1094            } else {
1095                let idx = *level_idx as usize - 1;
1096                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1097            }
1098        }
1099
1100        if !insert_table_infos.is_empty() {
1101            let insert_sst_level_id = *level_idx;
1102            let insert_sub_level_id = *l0_sub_level_id;
1103            if insert_sst_level_id == 0 {
1104                let l0 = &mut self.l0;
1105                let index = l0
1106                    .sub_levels
1107                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1108                assert!(
1109                    index < l0.sub_levels.len()
1110                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1111                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1112                    insert_sub_level_id,
1113                    delete_sst_ids_set,
1114                    l0.sub_levels
1115                        .iter()
1116                        .map(|level| level.sub_level_id)
1117                        .collect_vec()
1118                );
1119                if l0.sub_levels[index].table_infos.is_empty()
1120                    && member_table_ids.len() == 1
1121                    && insert_table_infos.iter().all(|sst| {
1122                        sst.table_ids.len() == 1
1123                            && sst.table_ids[0]
1124                                == *member_table_ids.iter().next().expect("non-empty")
1125                    })
1126                {
1127                    // Only change vnode_partition_count for group which has only one state-table.
1128                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1129                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1130                }
1131                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1132            } else {
1133                let idx = insert_sst_level_id as usize - 1;
1134                if self.levels[idx].table_infos.is_empty()
1135                    && insert_table_infos
1136                        .iter()
1137                        .all(|sst| sst.table_ids.len() == 1)
1138                {
1139                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1140                } else if self.levels[idx].vnode_partition_count != 0
1141                    && new_vnode_partition_count == 0
1142                    && member_table_ids.len() > 1
1143                {
1144                    self.levels[idx].vnode_partition_count = 0;
1145                }
1146                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1147            }
1148        }
1149    }
1150
1151    pub(crate) fn post_apply_l0_compact(&mut self) {
1152        {
1153            self.l0
1154                .sub_levels
1155                .retain(|level| !level.table_infos.is_empty());
1156            self.l0.total_file_size = self
1157                .l0
1158                .sub_levels
1159                .iter()
1160                .map(|level| level.total_file_size)
1161                .sum::<u64>();
1162            self.l0.uncompressed_file_size = self
1163                .l0
1164                .sub_levels
1165                .iter()
1166                .map(|level| level.uncompressed_file_size)
1167                .sum::<u64>();
1168        }
1169    }
1170}
1171
1172impl<T, L> HummockVersionCommon<T, L> {
1173    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1174        self.levels
1175            .values()
1176            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1177    }
1178}
1179
1180pub fn build_initial_compaction_group_levels(
1181    group_id: CompactionGroupId,
1182    compaction_config: &CompactionConfig,
1183) -> Levels {
1184    let mut levels = vec![];
1185    for l in 0..compaction_config.get_max_level() {
1186        levels.push(Level {
1187            level_idx: (l + 1) as u32,
1188            level_type: PbLevelType::Nonoverlapping,
1189            table_infos: vec![],
1190            total_file_size: 0,
1191            sub_level_id: 0,
1192            uncompressed_file_size: 0,
1193            vnode_partition_count: 0,
1194        });
1195    }
1196    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1197    Levels {
1198        levels,
1199        l0: OverlappingLevel {
1200            sub_levels: vec![],
1201            total_file_size: 0,
1202            uncompressed_file_size: 0,
1203        },
1204        group_id,
1205        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1206        member_table_ids: vec![],
1207        compaction_group_version_id: 0,
1208    }
1209}
1210
1211fn split_sst_info_for_level(
1212    member_table_ids: &BTreeSet<TableId>,
1213    level: &mut Level,
1214    new_sst_id: &mut HummockSstableId,
1215) -> Vec<SstableInfo> {
1216    // Remove SST from sub level may result in empty sub level. It will be purged
1217    // whenever another compaction task is finished.
1218    let mut insert_table_infos = vec![];
1219    for sst_info in &mut level.table_infos {
1220        let removed_table_ids = sst_info
1221            .table_ids
1222            .iter()
1223            .filter(|table_id| member_table_ids.contains(table_id))
1224            .cloned()
1225            .collect_vec();
1226        let sst_size = sst_info.sst_size;
1227        if sst_size / 2 == 0 {
1228            tracing::warn!(
1229                id = %sst_info.sst_id,
1230                object_id = %sst_info.object_id,
1231                sst_size = sst_info.sst_size,
1232                file_size = sst_info.file_size,
1233                "Sstable sst_size is under expected",
1234            );
1235        };
1236        if !removed_table_ids.is_empty() {
1237            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1238                sst_info,
1239                new_sst_id,
1240                sst_size / 2,
1241                sst_size / 2,
1242                member_table_ids.iter().cloned().collect_vec(),
1243            );
1244            *sst_info = modified_sst;
1245            insert_table_infos.push(branch_sst);
1246        }
1247    }
1248    insert_table_infos
1249}
1250
1251/// Gets all compaction group ids.
1252pub fn get_compaction_group_ids(
1253    version: &HummockVersion,
1254) -> impl Iterator<Item = CompactionGroupId> + '_ {
1255    version.levels.keys().cloned()
1256}
1257
1258pub fn get_table_compaction_group_id_mapping(
1259    version: &HummockVersion,
1260) -> HashMap<StateTableId, CompactionGroupId> {
1261    version
1262        .state_table_info
1263        .info()
1264        .iter()
1265        .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1266        .collect()
1267}
1268
1269/// Gets all SSTs in `group_id`
1270pub fn get_compaction_group_ssts(
1271    version: &HummockVersion,
1272    group_id: CompactionGroupId,
1273) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1274    let group_levels = version.get_compaction_group_levels(group_id);
1275    group_levels
1276        .l0
1277        .sub_levels
1278        .iter()
1279        .rev()
1280        .chain(group_levels.levels.iter())
1281        .flat_map(|level| {
1282            level
1283                .table_infos
1284                .iter()
1285                .map(|table_info| (table_info.object_id, table_info.sst_id))
1286        })
1287}
1288
1289pub fn new_sub_level(
1290    sub_level_id: u64,
1291    level_type: PbLevelType,
1292    table_infos: Vec<SstableInfo>,
1293) -> Level {
1294    if level_type == PbLevelType::Nonoverlapping {
1295        debug_assert!(
1296            can_concat(&table_infos),
1297            "sst of non-overlapping level is not concat-able: {:?}",
1298            table_infos
1299        );
1300    }
1301    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1302    let uncompressed_file_size = table_infos
1303        .iter()
1304        .map(|table| table.uncompressed_file_size)
1305        .sum();
1306    Level {
1307        level_idx: 0,
1308        level_type,
1309        table_infos,
1310        total_file_size,
1311        sub_level_id,
1312        uncompressed_file_size,
1313        vnode_partition_count: 0,
1314    }
1315}
1316
1317pub fn add_ssts_to_sub_level(
1318    l0: &mut OverlappingLevel,
1319    sub_level_idx: usize,
1320    insert_table_infos: Vec<SstableInfo>,
1321) {
1322    insert_table_infos.iter().for_each(|sst| {
1323        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1324        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1325        l0.total_file_size += sst.sst_size;
1326        l0.uncompressed_file_size += sst.uncompressed_file_size;
1327    });
1328    l0.sub_levels[sub_level_idx]
1329        .table_infos
1330        .extend(insert_table_infos);
1331    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1332        l0.sub_levels[sub_level_idx]
1333            .table_infos
1334            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1335        assert!(
1336            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1337            "sstable ids: {:?}",
1338            l0.sub_levels[sub_level_idx]
1339                .table_infos
1340                .iter()
1341                .map(|sst| sst.sst_id)
1342                .collect_vec()
1343        );
1344    }
1345}
1346
1347/// `None` value of `sub_level_insert_hint` means append.
1348pub fn insert_new_sub_level(
1349    l0: &mut OverlappingLevel,
1350    insert_sub_level_id: u64,
1351    level_type: PbLevelType,
1352    insert_table_infos: Vec<SstableInfo>,
1353    sub_level_insert_hint: Option<usize>,
1354) {
1355    if insert_sub_level_id == u64::MAX {
1356        return;
1357    }
1358    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1359        insert_pos
1360    } else {
1361        if let Some(newest_level) = l0.sub_levels.last() {
1362            assert!(
1363                newest_level.sub_level_id < insert_sub_level_id,
1364                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1365                newest_level.sub_level_id,
1366                insert_sub_level_id,
1367                l0,
1368            );
1369        }
1370        l0.sub_levels.len()
1371    };
1372    #[cfg(debug_assertions)]
1373    {
1374        if insert_pos > 0
1375            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1376        {
1377            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1378        }
1379        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1380            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1381        }
1382    }
1383    // All files will be committed in one new Overlapping sub-level and become
1384    // Nonoverlapping  after at least one compaction.
1385    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1386    l0.total_file_size += level.total_file_size;
1387    l0.uncompressed_file_size += level.uncompressed_file_size;
1388    l0.sub_levels.insert(insert_pos, level);
1389}
1390
1391/// Delete sstables if the table id is in the id set.
1392///
1393/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1394fn level_delete_ssts(
1395    operand: &mut Level,
1396    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1397) -> bool {
1398    let original_len = operand.table_infos.len();
1399    operand
1400        .table_infos
1401        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1402    operand.total_file_size = operand
1403        .table_infos
1404        .iter()
1405        .map(|table| table.sst_size)
1406        .sum::<u64>();
1407    operand.uncompressed_file_size = operand
1408        .table_infos
1409        .iter()
1410        .map(|table| table.uncompressed_file_size)
1411        .sum::<u64>();
1412    original_len != operand.table_infos.len()
1413}
1414
1415fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1416    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1417        format!(
1418            "sstable ids: {:?}",
1419            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1420        )
1421    }
1422    operand.total_file_size += insert_table_infos
1423        .iter()
1424        .map(|sst| sst.sst_size)
1425        .sum::<u64>();
1426    operand.uncompressed_file_size += insert_table_infos
1427        .iter()
1428        .map(|sst| sst.uncompressed_file_size)
1429        .sum::<u64>();
1430    if operand.level_type == PbLevelType::Overlapping {
1431        operand.level_type = PbLevelType::Nonoverlapping;
1432        operand
1433            .table_infos
1434            .extend(insert_table_infos.iter().cloned());
1435        operand
1436            .table_infos
1437            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1438        assert!(
1439            can_concat(&operand.table_infos),
1440            "{}",
1441            display_sstable_infos(&operand.table_infos)
1442        );
1443    } else if !insert_table_infos.is_empty() {
1444        let sorted_insert: Vec<_> = insert_table_infos
1445            .iter()
1446            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1447            .cloned()
1448            .collect();
1449        let first = &sorted_insert[0];
1450        let last = &sorted_insert[sorted_insert.len() - 1];
1451        let pos = operand
1452            .table_infos
1453            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1454        if pos >= operand.table_infos.len()
1455            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1456        {
1457            operand.table_infos.splice(pos..pos, sorted_insert);
1458            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1459            let validate_range = operand
1460                .table_infos
1461                .iter()
1462                .skip(pos.saturating_sub(1))
1463                .take(insert_table_infos.len() + 2)
1464                .collect_vec();
1465            assert!(
1466                can_concat(&validate_range),
1467                "{}",
1468                display_sstable_infos(&validate_range),
1469            );
1470        } else {
1471            // If this branch is reached, it indicates some unexpected behavior in compaction.
1472            // Here we issue a warning and fall back to insert one by one.
1473            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1474            for i in insert_table_infos {
1475                let pos = operand
1476                    .table_infos
1477                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1478                operand.table_infos.insert(pos, i.clone());
1479            }
1480            assert!(
1481                can_concat(&operand.table_infos),
1482                "{}",
1483                display_sstable_infos(&operand.table_infos)
1484            );
1485        }
1486    }
1487}
1488
1489pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1490    // DO NOT REMOVE THIS LINE
1491    // This is to ensure that when adding new variant to `HummockObjectId`,
1492    // the compiler will warn us if we forget to handle it here.
1493    match HummockObjectId::Sstable(0.into()) {
1494        HummockObjectId::Sstable(_) => {}
1495        HummockObjectId::VectorFile(_) => {}
1496        HummockObjectId::HnswGraphFile(_) => {}
1497    };
1498    version
1499        .levels
1500        .values()
1501        .flat_map(|cg| {
1502            cg.level0()
1503                .sub_levels
1504                .iter()
1505                .chain(cg.levels.iter())
1506                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1507        })
1508        .chain(version.table_change_log.values().flat_map(|c| {
1509            c.iter().flat_map(|l| {
1510                l.old_value
1511                    .iter()
1512                    .chain(l.new_value.iter())
1513                    .map(|t| (t.object_id, t.file_size))
1514            })
1515        }))
1516        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1517        .chain(
1518            version
1519                .vector_indexes
1520                .values()
1521                .flat_map(|index| index.get_objects()),
1522        )
1523        .collect()
1524}
1525
1526/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1527/// Currently this method is only used by risectl validate-version.
1528pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1529    let mut res = Vec::new();
1530    // Ensure each table maps to only one compaction group
1531    for (group_id, levels) in &version.levels {
1532        // Ensure compaction group id matches
1533        if levels.group_id != *group_id {
1534            res.push(format!(
1535                "GROUP {}: inconsistent group id {} in Levels",
1536                group_id, levels.group_id
1537            ));
1538        }
1539
1540        let validate_level = |group: CompactionGroupId,
1541                              expected_level_idx: u32,
1542                              level: &Level,
1543                              res: &mut Vec<String>| {
1544            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1545            if level.level_idx == 0 {
1546                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1547                // Ensure sub-level is not empty
1548                if level.table_infos.is_empty() {
1549                    res.push(format!("{}: empty level", level_identifier));
1550                }
1551            } else if level.level_type != PbLevelType::Nonoverlapping {
1552                // Ensure non-L0 level is non-overlapping level
1553                res.push(format!(
1554                    "{}: level type {:?} is not non-overlapping",
1555                    level_identifier, level.level_type
1556                ));
1557            }
1558
1559            // Ensure level idx matches
1560            if level.level_idx != expected_level_idx {
1561                res.push(format!(
1562                    "{}: mismatched level idx {}",
1563                    level_identifier, expected_level_idx
1564                ));
1565            }
1566
1567            let mut prev_table_info: Option<&SstableInfo> = None;
1568            for table_info in &level.table_infos {
1569                // Ensure table_ids are sorted and unique
1570                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1571                    res.push(format!(
1572                        "{} SST {}: table_ids not sorted",
1573                        level_identifier, table_info.object_id
1574                    ));
1575                }
1576
1577                // Ensure SSTs in non-overlapping level have non-overlapping key range
1578                if level.level_type == PbLevelType::Nonoverlapping {
1579                    if let Some(prev) = prev_table_info.take()
1580                        && prev
1581                            .key_range
1582                            .compare_right_with(&table_info.key_range.left)
1583                            != Ordering::Less
1584                    {
1585                        res.push(format!(
1586                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1587                            level_identifier, table_info.object_id, prev, table_info
1588                        ));
1589                    }
1590                    let _ = prev_table_info.insert(table_info);
1591                }
1592            }
1593        };
1594
1595        let l0 = &levels.l0;
1596        let mut prev_sub_level_id = u64::MAX;
1597        for sub_level in &l0.sub_levels {
1598            // Ensure sub_level_id is sorted and unique
1599            if sub_level.sub_level_id >= prev_sub_level_id {
1600                res.push(format!(
1601                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1602                    group_id, sub_level.level_idx, prev_sub_level_id
1603                ));
1604            }
1605            prev_sub_level_id = sub_level.sub_level_id;
1606
1607            validate_level(*group_id, 0, sub_level, &mut res);
1608        }
1609
1610        for idx in 1..=levels.levels.len() {
1611            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1612        }
1613    }
1614    res
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619    use std::collections::{HashMap, HashSet};
1620
1621    use bytes::Bytes;
1622    use risingwave_common::catalog::TableId;
1623    use risingwave_common::hash::VirtualNode;
1624    use risingwave_common::util::epoch::test_epoch;
1625    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1626
1627    use super::group_split;
1628    use crate::HummockVersionId;
1629    use crate::compaction_group::group_split::*;
1630    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1631    use crate::key::{FullKey, gen_key_from_str};
1632    use crate::key_range::KeyRange;
1633    use crate::level::{Level, Levels, OverlappingLevel};
1634    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1635    use crate::version::{
1636        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1637    };
1638
1639    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1640        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1641    }
1642
1643    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1644        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1645        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1646        let full_key_l = FullKey::for_test(
1647            TableId::new(*table_ids.first().unwrap()),
1648            table_key_l,
1649            epoch,
1650        )
1651        .encode();
1652        let full_key_r =
1653            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1654                .encode();
1655
1656        SstableInfoInner {
1657            sst_id: sst_id.into(),
1658            key_range: KeyRange {
1659                left: full_key_l.into(),
1660                right: full_key_r.into(),
1661                right_exclusive: false,
1662            },
1663            table_ids: table_ids.into_iter().map(Into::into).collect(),
1664            object_id: sst_id.into(),
1665            min_epoch: 20,
1666            max_epoch: 20,
1667            file_size: 100,
1668            sst_size: 100,
1669            ..Default::default()
1670        }
1671    }
1672
1673    #[test]
1674    fn test_get_sst_object_ids() {
1675        let mut version = HummockVersion {
1676            id: HummockVersionId::new(0),
1677            levels: HashMap::from_iter([(
1678                0,
1679                Levels {
1680                    levels: vec![],
1681                    l0: OverlappingLevel {
1682                        sub_levels: vec![],
1683                        total_file_size: 0,
1684                        uncompressed_file_size: 0,
1685                    },
1686                    ..Default::default()
1687                },
1688            )]),
1689            ..Default::default()
1690        };
1691        assert_eq!(version.get_object_ids(false).count(), 0);
1692
1693        // Add to sub level
1694        version
1695            .levels
1696            .get_mut(&0)
1697            .unwrap()
1698            .l0
1699            .sub_levels
1700            .push(Level {
1701                table_infos: vec![
1702                    SstableInfoInner {
1703                        object_id: 11.into(),
1704                        sst_id: 11.into(),
1705                        ..Default::default()
1706                    }
1707                    .into(),
1708                ],
1709                ..Default::default()
1710            });
1711        assert_eq!(version.get_object_ids(false).count(), 1);
1712
1713        // Add to non sub level
1714        version.levels.get_mut(&0).unwrap().levels.push(Level {
1715            table_infos: vec![
1716                SstableInfoInner {
1717                    object_id: 22.into(),
1718                    sst_id: 22.into(),
1719                    ..Default::default()
1720                }
1721                .into(),
1722            ],
1723            ..Default::default()
1724        });
1725        assert_eq!(version.get_object_ids(false).count(), 2);
1726    }
1727
1728    #[test]
1729    fn test_apply_version_delta() {
1730        let mut version = HummockVersion {
1731            id: HummockVersionId::new(0),
1732            levels: HashMap::from_iter([
1733                (
1734                    0,
1735                    build_initial_compaction_group_levels(
1736                        0,
1737                        &CompactionConfig {
1738                            max_level: 6,
1739                            ..Default::default()
1740                        },
1741                    ),
1742                ),
1743                (
1744                    1,
1745                    build_initial_compaction_group_levels(
1746                        1,
1747                        &CompactionConfig {
1748                            max_level: 6,
1749                            ..Default::default()
1750                        },
1751                    ),
1752                ),
1753            ]),
1754            ..Default::default()
1755        };
1756        let version_delta = HummockVersionDelta {
1757            id: HummockVersionId::new(1),
1758            group_deltas: HashMap::from_iter([
1759                (
1760                    2,
1761                    GroupDeltas {
1762                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1763                            group_config: Some(CompactionConfig {
1764                                max_level: 6,
1765                                ..Default::default()
1766                            }),
1767                            ..Default::default()
1768                        }))],
1769                    },
1770                ),
1771                (
1772                    0,
1773                    GroupDeltas {
1774                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1775                    },
1776                ),
1777                (
1778                    1,
1779                    GroupDeltas {
1780                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1781                            1,
1782                            0,
1783                            HashSet::new(),
1784                            vec![
1785                                SstableInfoInner {
1786                                    object_id: 1.into(),
1787                                    sst_id: 1.into(),
1788                                    ..Default::default()
1789                                }
1790                                .into(),
1791                            ],
1792                            0,
1793                            version
1794                                .levels
1795                                .get(&1)
1796                                .as_ref()
1797                                .unwrap()
1798                                .compaction_group_version_id,
1799                        ))],
1800                    },
1801                ),
1802            ]),
1803            ..Default::default()
1804        };
1805        let version_delta = version_delta;
1806
1807        version.apply_version_delta(&version_delta);
1808        let mut cg1 = build_initial_compaction_group_levels(
1809            1,
1810            &CompactionConfig {
1811                max_level: 6,
1812                ..Default::default()
1813            },
1814        );
1815        cg1.levels[0] = Level {
1816            level_idx: 1,
1817            level_type: LevelType::Nonoverlapping,
1818            table_infos: vec![
1819                SstableInfoInner {
1820                    object_id: 1.into(),
1821                    sst_id: 1.into(),
1822                    ..Default::default()
1823                }
1824                .into(),
1825            ],
1826            ..Default::default()
1827        };
1828        assert_eq!(
1829            version,
1830            HummockVersion {
1831                id: HummockVersionId::new(1),
1832                levels: HashMap::from_iter([
1833                    (
1834                        2,
1835                        build_initial_compaction_group_levels(
1836                            2,
1837                            &CompactionConfig {
1838                                max_level: 6,
1839                                ..Default::default()
1840                            },
1841                        ),
1842                    ),
1843                    (1, cg1),
1844                ]),
1845                ..Default::default()
1846            }
1847        );
1848    }
1849
1850    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1851        gen_sst_info_impl(object_id, table_ids, left, right).into()
1852    }
1853
1854    fn gen_sst_info_impl(
1855        object_id: u64,
1856        table_ids: Vec<u32>,
1857        left: Bytes,
1858        right: Bytes,
1859    ) -> SstableInfoInner {
1860        SstableInfoInner {
1861            object_id: object_id.into(),
1862            sst_id: object_id.into(),
1863            key_range: KeyRange {
1864                left,
1865                right,
1866                right_exclusive: false,
1867            },
1868            table_ids: table_ids.into_iter().map(Into::into).collect(),
1869            file_size: 100,
1870            sst_size: 100,
1871            uncompressed_file_size: 100,
1872            ..Default::default()
1873        }
1874    }
1875
1876    #[test]
1877    fn test_merge_levels() {
1878        let mut left_levels = build_initial_compaction_group_levels(
1879            1,
1880            &CompactionConfig {
1881                max_level: 6,
1882                ..Default::default()
1883            },
1884        );
1885
1886        let mut right_levels = build_initial_compaction_group_levels(
1887            2,
1888            &CompactionConfig {
1889                max_level: 6,
1890                ..Default::default()
1891            },
1892        );
1893
1894        left_levels.levels[0] = Level {
1895            level_idx: 1,
1896            level_type: LevelType::Nonoverlapping,
1897            table_infos: vec![
1898                gen_sst_info(
1899                    1,
1900                    vec![3],
1901                    FullKey::for_test(
1902                        TableId::new(3),
1903                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1904                        0,
1905                    )
1906                    .encode()
1907                    .into(),
1908                    FullKey::for_test(
1909                        TableId::new(3),
1910                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1911                        0,
1912                    )
1913                    .encode()
1914                    .into(),
1915                ),
1916                gen_sst_info(
1917                    10,
1918                    vec![3, 4],
1919                    FullKey::for_test(
1920                        TableId::new(3),
1921                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1922                        0,
1923                    )
1924                    .encode()
1925                    .into(),
1926                    FullKey::for_test(
1927                        TableId::new(4),
1928                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1929                        0,
1930                    )
1931                    .encode()
1932                    .into(),
1933                ),
1934                gen_sst_info(
1935                    11,
1936                    vec![4],
1937                    FullKey::for_test(
1938                        TableId::new(4),
1939                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1940                        0,
1941                    )
1942                    .encode()
1943                    .into(),
1944                    FullKey::for_test(
1945                        TableId::new(4),
1946                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1947                        0,
1948                    )
1949                    .encode()
1950                    .into(),
1951                ),
1952            ],
1953            total_file_size: 300,
1954            ..Default::default()
1955        };
1956
1957        left_levels.l0.sub_levels.push(Level {
1958            level_idx: 0,
1959            table_infos: vec![gen_sst_info(
1960                3,
1961                vec![3],
1962                FullKey::for_test(
1963                    TableId::new(3),
1964                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1965                    0,
1966                )
1967                .encode()
1968                .into(),
1969                FullKey::for_test(
1970                    TableId::new(3),
1971                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1972                    0,
1973                )
1974                .encode()
1975                .into(),
1976            )],
1977            sub_level_id: 101,
1978            level_type: LevelType::Overlapping,
1979            total_file_size: 100,
1980            ..Default::default()
1981        });
1982
1983        left_levels.l0.sub_levels.push(Level {
1984            level_idx: 0,
1985            table_infos: vec![gen_sst_info(
1986                3,
1987                vec![3],
1988                FullKey::for_test(
1989                    TableId::new(3),
1990                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1991                    0,
1992                )
1993                .encode()
1994                .into(),
1995                FullKey::for_test(
1996                    TableId::new(3),
1997                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1998                    0,
1999                )
2000                .encode()
2001                .into(),
2002            )],
2003            sub_level_id: 103,
2004            level_type: LevelType::Overlapping,
2005            total_file_size: 100,
2006            ..Default::default()
2007        });
2008
2009        left_levels.l0.sub_levels.push(Level {
2010            level_idx: 0,
2011            table_infos: vec![gen_sst_info(
2012                3,
2013                vec![3],
2014                FullKey::for_test(
2015                    TableId::new(3),
2016                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2017                    0,
2018                )
2019                .encode()
2020                .into(),
2021                FullKey::for_test(
2022                    TableId::new(3),
2023                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2024                    0,
2025                )
2026                .encode()
2027                .into(),
2028            )],
2029            sub_level_id: 105,
2030            level_type: LevelType::Nonoverlapping,
2031            total_file_size: 100,
2032            ..Default::default()
2033        });
2034
2035        right_levels.levels[0] = Level {
2036            level_idx: 1,
2037            level_type: LevelType::Nonoverlapping,
2038            table_infos: vec![
2039                gen_sst_info(
2040                    1,
2041                    vec![5],
2042                    FullKey::for_test(
2043                        TableId::new(5),
2044                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2045                        0,
2046                    )
2047                    .encode()
2048                    .into(),
2049                    FullKey::for_test(
2050                        TableId::new(5),
2051                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2052                        0,
2053                    )
2054                    .encode()
2055                    .into(),
2056                ),
2057                gen_sst_info(
2058                    10,
2059                    vec![5, 6],
2060                    FullKey::for_test(
2061                        TableId::new(5),
2062                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2063                        0,
2064                    )
2065                    .encode()
2066                    .into(),
2067                    FullKey::for_test(
2068                        TableId::new(6),
2069                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2070                        0,
2071                    )
2072                    .encode()
2073                    .into(),
2074                ),
2075                gen_sst_info(
2076                    11,
2077                    vec![6],
2078                    FullKey::for_test(
2079                        TableId::new(6),
2080                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2081                        0,
2082                    )
2083                    .encode()
2084                    .into(),
2085                    FullKey::for_test(
2086                        TableId::new(6),
2087                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2088                        0,
2089                    )
2090                    .encode()
2091                    .into(),
2092                ),
2093            ],
2094            total_file_size: 300,
2095            ..Default::default()
2096        };
2097
2098        right_levels.l0.sub_levels.push(Level {
2099            level_idx: 0,
2100            table_infos: vec![gen_sst_info(
2101                3,
2102                vec![5],
2103                FullKey::for_test(
2104                    TableId::new(5),
2105                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2106                    0,
2107                )
2108                .encode()
2109                .into(),
2110                FullKey::for_test(
2111                    TableId::new(5),
2112                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2113                    0,
2114                )
2115                .encode()
2116                .into(),
2117            )],
2118            sub_level_id: 101,
2119            level_type: LevelType::Overlapping,
2120            total_file_size: 100,
2121            ..Default::default()
2122        });
2123
2124        right_levels.l0.sub_levels.push(Level {
2125            level_idx: 0,
2126            table_infos: vec![gen_sst_info(
2127                5,
2128                vec![5],
2129                FullKey::for_test(
2130                    TableId::new(5),
2131                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2132                    0,
2133                )
2134                .encode()
2135                .into(),
2136                FullKey::for_test(
2137                    TableId::new(5),
2138                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2139                    0,
2140                )
2141                .encode()
2142                .into(),
2143            )],
2144            sub_level_id: 102,
2145            level_type: LevelType::Overlapping,
2146            total_file_size: 100,
2147            ..Default::default()
2148        });
2149
2150        right_levels.l0.sub_levels.push(Level {
2151            level_idx: 0,
2152            table_infos: vec![gen_sst_info(
2153                3,
2154                vec![5],
2155                FullKey::for_test(
2156                    TableId::new(5),
2157                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2158                    0,
2159                )
2160                .encode()
2161                .into(),
2162                FullKey::for_test(
2163                    TableId::new(5),
2164                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2165                    0,
2166                )
2167                .encode()
2168                .into(),
2169            )],
2170            sub_level_id: 103,
2171            level_type: LevelType::Nonoverlapping,
2172            total_file_size: 100,
2173            ..Default::default()
2174        });
2175
2176        {
2177            // test empty
2178            let mut left_levels = Levels::default();
2179            let right_levels = Levels::default();
2180
2181            group_split::merge_levels(&mut left_levels, right_levels);
2182        }
2183
2184        {
2185            // test empty left
2186            let mut left_levels = build_initial_compaction_group_levels(
2187                1,
2188                &CompactionConfig {
2189                    max_level: 6,
2190                    ..Default::default()
2191                },
2192            );
2193            let right_levels = right_levels.clone();
2194
2195            group_split::merge_levels(&mut left_levels, right_levels);
2196
2197            assert!(left_levels.l0.sub_levels.len() == 3);
2198            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2199            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2200            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2201            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2202            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2203            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2204
2205            assert!(left_levels.levels[0].level_idx == 1);
2206            assert_eq!(300, left_levels.levels[0].total_file_size);
2207        }
2208
2209        {
2210            // test empty right
2211            let mut left_levels = left_levels.clone();
2212            let right_levels = build_initial_compaction_group_levels(
2213                2,
2214                &CompactionConfig {
2215                    max_level: 6,
2216                    ..Default::default()
2217                },
2218            );
2219
2220            group_split::merge_levels(&mut left_levels, right_levels);
2221
2222            assert!(left_levels.l0.sub_levels.len() == 3);
2223            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2224            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2225            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2226            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2227            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2228            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2229
2230            assert!(left_levels.levels[0].level_idx == 1);
2231            assert_eq!(300, left_levels.levels[0].total_file_size);
2232        }
2233
2234        {
2235            let mut left_levels = left_levels.clone();
2236            let right_levels = right_levels.clone();
2237
2238            group_split::merge_levels(&mut left_levels, right_levels);
2239
2240            assert!(left_levels.l0.sub_levels.len() == 6);
2241            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2242            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2243            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2244            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2245            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2246            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2247            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2248            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2249            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2250            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2251            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2252            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2253
2254            assert!(left_levels.levels[0].level_idx == 1);
2255            assert_eq!(600, left_levels.levels[0].total_file_size);
2256        }
2257    }
2258
2259    #[test]
2260    fn test_get_split_pos() {
2261        let epoch = test_epoch(1);
2262        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2263        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2264        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2265
2266        let ssts = vec![s1, s2, s3];
2267        let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2268
2269        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2270        assert_eq!(1, pos);
2271
2272        let pos = group_split::get_split_pos(&vec![], split_key);
2273        assert_eq!(0, pos);
2274    }
2275
2276    #[test]
2277    fn test_split_sst() {
2278        let epoch = test_epoch(1);
2279        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2280
2281        {
2282            let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2283            let origin_sst = sst.clone();
2284            let sst_size = origin_sst.sst_size;
2285            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2286            assert_eq!(SstSplitType::Both, split_type);
2287
2288            let mut new_sst_id = 10.into();
2289            let (origin_sst, branched_sst) = group_split::split_sst(
2290                origin_sst,
2291                &mut new_sst_id,
2292                split_key,
2293                sst_size / 2,
2294                sst_size / 2,
2295            );
2296
2297            let origin_sst = origin_sst.unwrap();
2298            let branched_sst = branched_sst.unwrap();
2299
2300            assert!(origin_sst.key_range.right_exclusive);
2301            assert!(
2302                origin_sst
2303                    .key_range
2304                    .right
2305                    .cmp(&branched_sst.key_range.left)
2306                    .is_le()
2307            );
2308            assert!(origin_sst.table_ids.is_sorted());
2309            assert!(branched_sst.table_ids.is_sorted());
2310            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2311            assert!(branched_sst.sst_size < origin_sst.file_size);
2312            assert_eq!(10, branched_sst.sst_id);
2313            assert_eq!(11, origin_sst.sst_id);
2314            assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2315        }
2316
2317        {
2318            // test un-exist table_id
2319            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2320            let origin_sst = sst.clone();
2321            let sst_size = origin_sst.sst_size;
2322            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2323            assert_eq!(SstSplitType::Both, split_type);
2324
2325            let mut new_sst_id = 10.into();
2326            let (origin_sst, branched_sst) = group_split::split_sst(
2327                origin_sst,
2328                &mut new_sst_id,
2329                split_key,
2330                sst_size / 2,
2331                sst_size / 2,
2332            );
2333
2334            let origin_sst = origin_sst.unwrap();
2335            let branched_sst = branched_sst.unwrap();
2336
2337            assert!(origin_sst.key_range.right_exclusive);
2338            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2339            assert!(origin_sst.table_ids.is_sorted());
2340            assert!(branched_sst.table_ids.is_sorted());
2341            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2342            assert!(branched_sst.sst_size < origin_sst.file_size);
2343            assert_eq!(10, branched_sst.sst_id);
2344            assert_eq!(11, origin_sst.sst_id);
2345            assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); // split table_id to right
2346        }
2347
2348        {
2349            let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2350            let split_type = group_split::need_to_split(&sst, split_key);
2351            assert_eq!(SstSplitType::Left, split_type);
2352        }
2353
2354        {
2355            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2356            let origin_sst = sst.clone();
2357            let split_type = group_split::need_to_split(&origin_sst, split_key);
2358            assert_eq!(SstSplitType::Both, split_type);
2359
2360            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2361            let origin_sst = sst;
2362            let split_type = group_split::need_to_split(&origin_sst, split_key);
2363            assert_eq!(SstSplitType::Right, split_type);
2364        }
2365
2366        {
2367            // test key_range left = right
2368            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2369            sst.key_range.right = sst.key_range.left.clone();
2370            let sst: SstableInfo = sst.into();
2371            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2372            let origin_sst = sst;
2373            let sst_size = origin_sst.sst_size;
2374
2375            let mut new_sst_id = 10.into();
2376            let (origin_sst, branched_sst) = group_split::split_sst(
2377                origin_sst,
2378                &mut new_sst_id,
2379                split_key,
2380                sst_size / 2,
2381                sst_size / 2,
2382            );
2383
2384            assert!(origin_sst.is_none());
2385            assert!(branched_sst.is_some());
2386        }
2387    }
2388
2389    #[test]
2390    fn test_split_sst_info_for_level() {
2391        let mut version = HummockVersion {
2392            id: HummockVersionId(0),
2393            levels: HashMap::from_iter([(
2394                1,
2395                build_initial_compaction_group_levels(
2396                    1,
2397                    &CompactionConfig {
2398                        max_level: 6,
2399                        ..Default::default()
2400                    },
2401                ),
2402            )]),
2403            ..Default::default()
2404        };
2405
2406        let cg1 = version.levels.get_mut(&1).unwrap();
2407
2408        cg1.levels[0] = Level {
2409            level_idx: 1,
2410            level_type: LevelType::Nonoverlapping,
2411            table_infos: vec![
2412                gen_sst_info(
2413                    1,
2414                    vec![3],
2415                    FullKey::for_test(
2416                        TableId::new(3),
2417                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2418                        0,
2419                    )
2420                    .encode()
2421                    .into(),
2422                    FullKey::for_test(
2423                        TableId::new(3),
2424                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2425                        0,
2426                    )
2427                    .encode()
2428                    .into(),
2429                ),
2430                gen_sst_info(
2431                    10,
2432                    vec![3, 4],
2433                    FullKey::for_test(
2434                        TableId::new(3),
2435                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2436                        0,
2437                    )
2438                    .encode()
2439                    .into(),
2440                    FullKey::for_test(
2441                        TableId::new(4),
2442                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2443                        0,
2444                    )
2445                    .encode()
2446                    .into(),
2447                ),
2448                gen_sst_info(
2449                    11,
2450                    vec![4],
2451                    FullKey::for_test(
2452                        TableId::new(4),
2453                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2454                        0,
2455                    )
2456                    .encode()
2457                    .into(),
2458                    FullKey::for_test(
2459                        TableId::new(4),
2460                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2461                        0,
2462                    )
2463                    .encode()
2464                    .into(),
2465                ),
2466            ],
2467            total_file_size: 300,
2468            ..Default::default()
2469        };
2470
2471        cg1.l0.sub_levels.push(Level {
2472            level_idx: 0,
2473            table_infos: vec![
2474                gen_sst_info(
2475                    2,
2476                    vec![2],
2477                    FullKey::for_test(
2478                        TableId::new(0),
2479                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2480                        0,
2481                    )
2482                    .encode()
2483                    .into(),
2484                    FullKey::for_test(
2485                        TableId::new(2),
2486                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2487                        0,
2488                    )
2489                    .encode()
2490                    .into(),
2491                ),
2492                gen_sst_info(
2493                    22,
2494                    vec![2],
2495                    FullKey::for_test(
2496                        TableId::new(0),
2497                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2498                        0,
2499                    )
2500                    .encode()
2501                    .into(),
2502                    FullKey::for_test(
2503                        TableId::new(2),
2504                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2505                        0,
2506                    )
2507                    .encode()
2508                    .into(),
2509                ),
2510                gen_sst_info(
2511                    23,
2512                    vec![2],
2513                    FullKey::for_test(
2514                        TableId::new(0),
2515                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2516                        0,
2517                    )
2518                    .encode()
2519                    .into(),
2520                    FullKey::for_test(
2521                        TableId::new(2),
2522                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2523                        0,
2524                    )
2525                    .encode()
2526                    .into(),
2527                ),
2528                gen_sst_info(
2529                    24,
2530                    vec![2],
2531                    FullKey::for_test(
2532                        TableId::new(2),
2533                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2534                        0,
2535                    )
2536                    .encode()
2537                    .into(),
2538                    FullKey::for_test(
2539                        TableId::new(2),
2540                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2541                        0,
2542                    )
2543                    .encode()
2544                    .into(),
2545                ),
2546                gen_sst_info(
2547                    25,
2548                    vec![2],
2549                    FullKey::for_test(
2550                        TableId::new(0),
2551                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2552                        0,
2553                    )
2554                    .encode()
2555                    .into(),
2556                    FullKey::for_test(
2557                        TableId::new(0),
2558                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2559                        0,
2560                    )
2561                    .encode()
2562                    .into(),
2563                ),
2564            ],
2565            sub_level_id: 101,
2566            level_type: LevelType::Overlapping,
2567            total_file_size: 300,
2568            ..Default::default()
2569        });
2570
2571        {
2572            // split Overlapping level
2573            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2574
2575            let mut new_sst_id = 100.into();
2576            let x = group_split::split_sst_info_for_level_v2(
2577                &mut cg1.l0.sub_levels[0],
2578                &mut new_sst_id,
2579                split_key,
2580            );
2581            // assert_eq!(3, x.len());
2582            // assert_eq!(100, x[0].sst_id);
2583            // assert_eq!(100, x[0].sst_size);
2584            // assert_eq!(101, x[1].sst_id);
2585            // assert_eq!(100, x[1].sst_size);
2586            // assert_eq!(102, x[2].sst_id);
2587            // assert_eq!(100, x[2].sst_size);
2588
2589            let mut right_l0 = OverlappingLevel {
2590                sub_levels: vec![],
2591                total_file_size: 0,
2592                uncompressed_file_size: 0,
2593            };
2594
2595            right_l0.sub_levels.push(Level {
2596                level_idx: 0,
2597                table_infos: x,
2598                sub_level_id: 101,
2599                total_file_size: 100,
2600                level_type: LevelType::Overlapping,
2601                ..Default::default()
2602            });
2603
2604            let right_levels = Levels {
2605                levels: vec![],
2606                l0: right_l0,
2607                ..Default::default()
2608            };
2609
2610            merge_levels(cg1, right_levels);
2611        }
2612
2613        {
2614            // test split empty level
2615            let mut new_sst_id = 100.into();
2616            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2617            let x = group_split::split_sst_info_for_level_v2(
2618                &mut cg1.levels[2],
2619                &mut new_sst_id,
2620                split_key,
2621            );
2622
2623            assert!(x.is_empty());
2624        }
2625
2626        {
2627            // test split to right Nonoverlapping level
2628            let mut cg1 = cg1.clone();
2629            let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2630
2631            let mut new_sst_id = 100.into();
2632            let x = group_split::split_sst_info_for_level_v2(
2633                &mut cg1.levels[0],
2634                &mut new_sst_id,
2635                split_key,
2636            );
2637
2638            assert_eq!(3, x.len());
2639            assert_eq!(1, x[0].sst_id);
2640            assert_eq!(100, x[0].sst_size);
2641            assert_eq!(10, x[1].sst_id);
2642            assert_eq!(100, x[1].sst_size);
2643            assert_eq!(11, x[2].sst_id);
2644            assert_eq!(100, x[2].sst_size);
2645
2646            assert_eq!(0, cg1.levels[0].table_infos.len());
2647        }
2648
2649        {
2650            // test split to left Nonoverlapping level
2651            let mut cg1 = cg1.clone();
2652            let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2653
2654            let mut new_sst_id = 100.into();
2655            let x = group_split::split_sst_info_for_level_v2(
2656                &mut cg1.levels[0],
2657                &mut new_sst_id,
2658                split_key,
2659            );
2660
2661            assert_eq!(0, x.len());
2662            assert_eq!(3, cg1.levels[0].table_infos.len());
2663        }
2664
2665        // {
2666        //     // test split to both Nonoverlapping level
2667        //     let mut cg1 = cg1.clone();
2668        //     let split_key = build_split_key(3, VirtualNode::MAX);
2669
2670        //     let mut new_sst_id = 100;
2671        //     let x = group_split::split_sst_info_for_level_v2(
2672        //         &mut cg1.levels[0],
2673        //         &mut new_sst_id,
2674        //         split_key,
2675        //     );
2676
2677        //     assert_eq!(2, x.len());
2678        //     assert_eq!(100, x[0].sst_id);
2679        //     assert_eq!(100 / 2, x[0].sst_size);
2680        //     assert_eq!(11, x[1].sst_id);
2681        //     assert_eq!(100, x[1].sst_size);
2682        //     assert_eq!(vec![3, 4], x[0].table_ids);
2683
2684        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2685        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2686        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2687        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2688        // }
2689
2690        {
2691            // test split to both Nonoverlapping level
2692            let mut cg1 = cg1.clone();
2693            let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2694
2695            let mut new_sst_id = 100.into();
2696            let x = group_split::split_sst_info_for_level_v2(
2697                &mut cg1.levels[0],
2698                &mut new_sst_id,
2699                split_key,
2700            );
2701
2702            assert_eq!(2, x.len());
2703            assert_eq!(100, x[0].sst_id);
2704            assert_eq!(100 / 2, x[0].sst_size);
2705            assert_eq!(11, x[1].sst_id);
2706            assert_eq!(100, x[1].sst_size);
2707            assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2708
2709            assert_eq!(2, cg1.levels[0].table_infos.len());
2710            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2711            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2712            assert_eq!(
2713                vec![TableId::new(3)],
2714                cg1.levels[0].table_infos[1].table_ids
2715            );
2716        }
2717    }
2718}