risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::version::{
41    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
42    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
43};
44use crate::{
45    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
46};
47
48#[derive(Debug, Clone, Default)]
49pub struct SstDeltaInfo {
50    pub insert_sst_level: u32,
51    pub insert_sst_infos: Vec<SstableInfo>,
52    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
53}
54
55pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
56
57impl<L> HummockVersionCommon<SstableInfo, L> {
58    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
59        self.levels
60            .get(&compaction_group_id)
61            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
62    }
63
64    pub fn get_compaction_group_levels_mut(
65        &mut self,
66        compaction_group_id: CompactionGroupId,
67    ) -> &mut Levels {
68        self.levels
69            .get_mut(&compaction_group_id)
70            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
71    }
72
73    // only scan the sst infos from levels in the specified compaction group (without table change log)
74    pub fn get_sst_ids_by_group_id(
75        &self,
76        compaction_group_id: CompactionGroupId,
77    ) -> impl Iterator<Item = HummockSstableId> + '_ {
78        self.levels
79            .iter()
80            .filter_map(move |(cg_id, level)| {
81                if *cg_id == compaction_group_id {
82                    Some(level)
83                } else {
84                    None
85                }
86            })
87            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
88            .flat_map(|level| level.table_infos.iter())
89            .map(|s| s.sst_id)
90    }
91
92    pub fn level_iter<F: FnMut(&Level) -> bool>(
93        &self,
94        compaction_group_id: CompactionGroupId,
95        mut f: F,
96    ) {
97        if let Some(levels) = self.levels.get(&compaction_group_id) {
98            for sub_level in &levels.l0.sub_levels {
99                if !f(sub_level) {
100                    return;
101                }
102            }
103            for level in &levels.levels {
104                if !f(level) {
105                    return;
106                }
107            }
108        }
109    }
110
111    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
112        // l0 is currently separated from all levels
113        self.levels
114            .get(&compaction_group_id)
115            .map(|group| group.levels.len() + 1)
116            .unwrap_or(0)
117    }
118
119    pub fn safe_epoch_table_watermarks(
120        &self,
121        existing_table_ids: &[u32],
122    ) -> BTreeMap<u32, TableWatermarks> {
123        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
124    }
125}
126
127pub fn safe_epoch_table_watermarks_impl(
128    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
129    existing_table_ids: &[u32],
130) -> BTreeMap<u32, TableWatermarks> {
131    fn extract_single_table_watermark(
132        table_watermarks: &TableWatermarks,
133    ) -> Option<TableWatermarks> {
134        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
135            Some(TableWatermarks {
136                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
137                direction: table_watermarks.direction,
138                watermark_type: table_watermarks.watermark_type,
139            })
140        } else {
141            None
142        }
143    }
144    table_watermarks
145        .iter()
146        .filter_map(|(table_id, table_watermarks)| {
147            let u32_table_id = table_id.table_id();
148            if !existing_table_ids.contains(&u32_table_id) {
149                None
150            } else {
151                extract_single_table_watermark(table_watermarks)
152                    .map(|table_watermarks| (table_id.table_id, table_watermarks))
153            }
154        })
155        .collect()
156}
157
158pub fn safe_epoch_read_table_watermarks_impl(
159    safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
160) -> BTreeMap<TableId, ReadTableWatermark> {
161    safe_epoch_watermarks
162        .into_iter()
163        .map(|(table_id, watermarks)| {
164            assert_eq!(watermarks.watermarks.len(), 1);
165            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
166            let mut vnode_watermark_map = BTreeMap::new();
167            for vnode_watermark in vnode_watermarks.iter() {
168                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
169                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
170                    assert!(
171                        vnode_watermark_map
172                            .insert(vnode, watermark.clone())
173                            .is_none(),
174                        "duplicate table watermark on vnode {}",
175                        vnode.to_index()
176                    );
177                }
178            }
179            (
180                TableId::from(table_id),
181                ReadTableWatermark {
182                    direction: watermarks.direction,
183                    vnode_watermarks: vnode_watermark_map,
184                },
185            )
186        })
187        .collect()
188}
189
190impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
191    pub fn count_new_ssts_in_group_split(
192        &self,
193        parent_group_id: CompactionGroupId,
194        split_key: Bytes,
195    ) -> u64 {
196        self.levels
197            .get(&parent_group_id)
198            .map_or(0, |parent_levels| {
199                let l0 = &parent_levels.l0;
200                let mut split_count = 0;
201                for sub_level in &l0.sub_levels {
202                    assert!(!sub_level.table_infos.is_empty());
203
204                    if sub_level.level_type == PbLevelType::Overlapping {
205                        // TODO: use table_id / vnode / key_range filter
206                        split_count += sub_level
207                            .table_infos
208                            .iter()
209                            .map(|sst| {
210                                if let group_split::SstSplitType::Both =
211                                    group_split::need_to_split(sst, split_key.clone())
212                                {
213                                    2
214                                } else {
215                                    0
216                                }
217                            })
218                            .sum::<u64>();
219                        continue;
220                    }
221
222                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
223                    let sst = sub_level.table_infos.get(pos).unwrap();
224
225                    if let group_split::SstSplitType::Both =
226                        group_split::need_to_split(sst, split_key.clone())
227                    {
228                        split_count += 2;
229                    }
230                }
231
232                for level in &parent_levels.levels {
233                    if level.table_infos.is_empty() {
234                        continue;
235                    }
236                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
237                    let sst = level.table_infos.get(pos).unwrap();
238                    if let group_split::SstSplitType::Both =
239                        group_split::need_to_split(sst, split_key.clone())
240                    {
241                        split_count += 2;
242                    }
243                }
244
245                split_count
246            })
247    }
248
249    pub fn init_with_parent_group(
250        &mut self,
251        parent_group_id: CompactionGroupId,
252        group_id: CompactionGroupId,
253        member_table_ids: BTreeSet<StateTableId>,
254        new_sst_start_id: HummockSstableId,
255    ) {
256        let mut new_sst_id = new_sst_start_id;
257        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
258            if new_sst_start_id != 0 {
259                if cfg!(debug_assertions) {
260                    panic!(
261                        "non-zero sst start id {} for NewCompactionGroup",
262                        new_sst_start_id
263                    );
264                } else {
265                    warn!(
266                        %new_sst_start_id,
267                        "non-zero sst start id for NewCompactionGroup"
268                    );
269                }
270            }
271            return;
272        } else if !self.levels.contains_key(&parent_group_id) {
273            unreachable!(
274                "non-existing parent group id {} to init from",
275                parent_group_id
276            );
277        }
278        let [parent_levels, cur_levels] = self
279            .levels
280            .get_disjoint_mut([&parent_group_id, &group_id])
281            .map(|res| res.unwrap());
282        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
283        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
284        parent_levels.compaction_group_version_id += 1;
285        cur_levels.compaction_group_version_id += 1;
286        let l0 = &mut parent_levels.l0;
287        {
288            for sub_level in &mut l0.sub_levels {
289                let target_l0 = &mut cur_levels.l0;
290                // Remove SST from sub level may result in empty sub level. It will be purged
291                // whenever another compaction task is finished.
292                let insert_table_infos =
293                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
294                sub_level
295                    .table_infos
296                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
297                    .for_each(|sst_info| {
298                        sub_level.total_file_size -= sst_info.sst_size;
299                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
300                        l0.total_file_size -= sst_info.sst_size;
301                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
302                    });
303                if insert_table_infos.is_empty() {
304                    continue;
305                }
306                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
307                    Ok(idx) => {
308                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
309                    }
310                    Err(idx) => {
311                        insert_new_sub_level(
312                            target_l0,
313                            sub_level.sub_level_id,
314                            sub_level.level_type,
315                            insert_table_infos,
316                            Some(idx),
317                        );
318                    }
319                }
320            }
321
322            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
323        }
324        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
325            let insert_table_infos =
326                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
327            cur_levels.levels[idx].total_file_size += insert_table_infos
328                .iter()
329                .map(|sst| sst.sst_size)
330                .sum::<u64>();
331            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
332                .iter()
333                .map(|sst| sst.uncompressed_file_size)
334                .sum::<u64>();
335            cur_levels.levels[idx]
336                .table_infos
337                .extend(insert_table_infos);
338            cur_levels.levels[idx]
339                .table_infos
340                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
341            assert!(can_concat(&cur_levels.levels[idx].table_infos));
342            level
343                .table_infos
344                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
345                .for_each(|sst_info| {
346                    level.total_file_size -= sst_info.sst_size;
347                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
348                });
349        }
350
351        assert!(
352            parent_levels
353                .l0
354                .sub_levels
355                .iter()
356                .all(|level| !level.table_infos.is_empty())
357        );
358        assert!(
359            cur_levels
360                .l0
361                .sub_levels
362                .iter()
363                .all(|level| !level.table_infos.is_empty())
364        );
365    }
366
367    pub fn build_sst_delta_infos(
368        &self,
369        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
370    ) -> Vec<SstDeltaInfo> {
371        let mut infos = vec![];
372
373        // Skip trivial move delta for refiller
374        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
375        if version_delta.trivial_move {
376            return infos;
377        }
378
379        for (group_id, group_deltas) in &version_delta.group_deltas {
380            let mut info = SstDeltaInfo::default();
381
382            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
383            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
384
385            // Build only if all deltas are intra level deltas.
386            if !group_deltas.group_deltas.iter().all(|delta| {
387                matches!(
388                    delta,
389                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
390                )
391            }) {
392                continue;
393            }
394
395            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
396            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
397            // struct to reduce conventions.
398            for group_delta in &group_deltas.group_deltas {
399                match group_delta {
400                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
401                        if !inserted_table_infos.is_empty() {
402                            info.insert_sst_level = 0;
403                            info.insert_sst_infos
404                                .extend(inserted_table_infos.iter().cloned());
405                        }
406                    }
407                    GroupDeltaCommon::IntraLevel(intra_level) => {
408                        if !intra_level.inserted_table_infos.is_empty() {
409                            info.insert_sst_level = intra_level.level_idx;
410                            info.insert_sst_infos
411                                .extend(intra_level.inserted_table_infos.iter().cloned());
412                        }
413                        if !intra_level.removed_table_ids.is_empty() {
414                            for id in &intra_level.removed_table_ids {
415                                if intra_level.level_idx == 0 {
416                                    removed_l0_ssts.insert(*id);
417                                } else {
418                                    removed_ssts
419                                        .entry(intra_level.level_idx)
420                                        .or_default()
421                                        .insert(*id);
422                                }
423                            }
424                        }
425                    }
426                    GroupDeltaCommon::GroupConstruct(_)
427                    | GroupDeltaCommon::GroupDestroy(_)
428                    | GroupDeltaCommon::GroupMerge(_) => {}
429                }
430            }
431
432            let group = self.levels.get(group_id).unwrap();
433            for l0_sub_level in &group.level0().sub_levels {
434                for sst_info in &l0_sub_level.table_infos {
435                    if removed_l0_ssts.remove(&sst_info.sst_id) {
436                        info.delete_sst_object_ids.push(sst_info.object_id);
437                    }
438                }
439            }
440            for level in &group.levels {
441                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
442                    for sst_info in &level.table_infos {
443                        if removed_level_ssts.remove(&sst_info.sst_id) {
444                            info.delete_sst_object_ids.push(sst_info.object_id);
445                        }
446                    }
447                    if !removed_level_ssts.is_empty() {
448                        tracing::error!(
449                            "removed_level_ssts is not empty: {:?}",
450                            removed_level_ssts,
451                        );
452                    }
453                    debug_assert!(removed_level_ssts.is_empty());
454                }
455            }
456
457            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
458                tracing::error!(
459                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
460                    removed_l0_ssts,
461                    removed_ssts
462                );
463            }
464            debug_assert!(removed_l0_ssts.is_empty());
465            debug_assert!(removed_ssts.is_empty());
466
467            infos.push(info);
468        }
469
470        infos
471    }
472
473    pub fn apply_version_delta(
474        &mut self,
475        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
476    ) {
477        assert_eq!(self.id, version_delta.prev_id);
478
479        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
480            &version_delta.state_table_info_delta,
481            &version_delta.removed_table_ids,
482        );
483
484        #[expect(deprecated)]
485        {
486            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
487                is_commit_epoch = true;
488                tracing::trace!(
489                    "max committed epoch bumped but no table committed epoch is changed"
490                );
491            }
492        }
493
494        // apply to `levels`, which is different compaction groups
495        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
496            let mut is_applied_l0_compact = false;
497            for group_delta in &group_deltas.group_deltas {
498                match group_delta {
499                    GroupDeltaCommon::GroupConstruct(group_construct) => {
500                        let mut new_levels = build_initial_compaction_group_levels(
501                            *compaction_group_id,
502                            group_construct.get_group_config().unwrap(),
503                        );
504                        let parent_group_id = group_construct.parent_group_id;
505                        new_levels.parent_group_id = parent_group_id;
506                        #[expect(deprecated)]
507                        // for backward-compatibility of previous hummock version delta
508                        new_levels
509                            .member_table_ids
510                            .clone_from(&group_construct.table_ids);
511                        self.levels.insert(*compaction_group_id, new_levels);
512                        let member_table_ids = if group_construct.version()
513                            >= CompatibilityVersion::NoMemberTableIds
514                        {
515                            self.state_table_info
516                                .compaction_group_member_table_ids(*compaction_group_id)
517                                .iter()
518                                .map(|table_id| table_id.table_id)
519                                .collect()
520                        } else {
521                            #[expect(deprecated)]
522                            // for backward-compatibility of previous hummock version delta
523                            BTreeSet::from_iter(group_construct.table_ids.clone())
524                        };
525
526                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
527                            let split_key = if group_construct.split_key.is_some() {
528                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
529                            } else {
530                                None
531                            };
532                            self.init_with_parent_group_v2(
533                                parent_group_id,
534                                *compaction_group_id,
535                                group_construct.get_new_sst_start_id().into(),
536                                split_key.clone(),
537                            );
538                        } else {
539                            // for backward-compatibility of previous hummock version delta
540                            self.init_with_parent_group(
541                                parent_group_id,
542                                *compaction_group_id,
543                                member_table_ids,
544                                group_construct.get_new_sst_start_id().into(),
545                            );
546                        }
547                    }
548                    GroupDeltaCommon::GroupMerge(group_merge) => {
549                        tracing::info!(
550                            "group_merge left {:?} right {:?}",
551                            group_merge.left_group_id,
552                            group_merge.right_group_id
553                        );
554                        self.merge_compaction_group(
555                            group_merge.left_group_id,
556                            group_merge.right_group_id,
557                        )
558                    }
559                    GroupDeltaCommon::IntraLevel(level_delta) => {
560                        let levels =
561                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
562                                panic!("compaction group {} does not exist", compaction_group_id)
563                            });
564                        if is_commit_epoch {
565                            assert!(
566                                level_delta.removed_table_ids.is_empty(),
567                                "no sst should be deleted when committing an epoch"
568                            );
569
570                            let IntraLevelDelta {
571                                level_idx,
572                                l0_sub_level_id,
573                                inserted_table_infos,
574                                ..
575                            } = level_delta;
576                            {
577                                assert_eq!(
578                                    *level_idx, 0,
579                                    "we should only add to L0 when we commit an epoch."
580                                );
581                                if !inserted_table_infos.is_empty() {
582                                    insert_new_sub_level(
583                                        &mut levels.l0,
584                                        *l0_sub_level_id,
585                                        PbLevelType::Overlapping,
586                                        inserted_table_infos.clone(),
587                                        None,
588                                    );
589                                }
590                            }
591                        } else {
592                            // The delta is caused by compaction.
593                            levels.apply_compact_ssts(
594                                level_delta,
595                                self.state_table_info
596                                    .compaction_group_member_table_ids(*compaction_group_id),
597                            );
598                            if level_delta.level_idx == 0 {
599                                is_applied_l0_compact = true;
600                            }
601                        }
602                    }
603                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
604                        let levels =
605                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
606                                panic!("compaction group {} does not exist", compaction_group_id)
607                            });
608                        assert!(is_commit_epoch);
609
610                        if !inserted_table_infos.is_empty() {
611                            let next_l0_sub_level_id = levels
612                                .l0
613                                .sub_levels
614                                .last()
615                                .map(|level| level.sub_level_id + 1)
616                                .unwrap_or(1);
617
618                            insert_new_sub_level(
619                                &mut levels.l0,
620                                next_l0_sub_level_id,
621                                PbLevelType::Overlapping,
622                                inserted_table_infos.clone(),
623                                None,
624                            );
625                        }
626                    }
627                    GroupDeltaCommon::GroupDestroy(_) => {
628                        self.levels.remove(compaction_group_id);
629                    }
630                }
631            }
632            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
633            {
634                levels.post_apply_l0_compact();
635            }
636        }
637        self.id = version_delta.id;
638        #[expect(deprecated)]
639        {
640            self.max_committed_epoch = version_delta.max_committed_epoch;
641        }
642
643        // apply to table watermark
644
645        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
646        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
647            HashMap::new();
648
649        // apply to table watermark
650        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
651            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
652                if version_delta.removed_table_ids.contains(table_id) {
653                    modified_table_watermarks.insert(*table_id, None);
654                } else {
655                    let mut current_table_watermarks = (**current_table_watermarks).clone();
656                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
657                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
658                }
659            } else {
660                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
661            }
662        }
663        for (table_id, table_watermarks) in &self.table_watermarks {
664            let safe_epoch = if let Some(state_table_info) =
665                self.state_table_info.info().get(table_id)
666                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
667                && state_table_info.committed_epoch > *oldest_epoch
668            {
669                // safe epoch has progressed, need further clear.
670                state_table_info.committed_epoch
671            } else {
672                // safe epoch not progressed or the table has been removed. No need to truncate
673                continue;
674            };
675            let table_watermarks = modified_table_watermarks
676                .entry(*table_id)
677                .or_insert_with(|| Some((**table_watermarks).clone()));
678            if let Some(table_watermarks) = table_watermarks {
679                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
680            }
681        }
682        // apply the staging table watermark to hummock version
683        for (table_id, table_watermarks) in modified_table_watermarks {
684            if let Some(table_watermarks) = table_watermarks {
685                self.table_watermarks
686                    .insert(table_id, Arc::new(table_watermarks));
687            } else {
688                self.table_watermarks.remove(&table_id);
689            }
690        }
691
692        // apply to table change log
693        Self::apply_change_log_delta(
694            &mut self.table_change_log,
695            &version_delta.change_log_delta,
696            &version_delta.removed_table_ids,
697            &version_delta.state_table_info_delta,
698            &changed_table_info,
699        );
700    }
701
702    pub fn apply_change_log_delta<T: Clone>(
703        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
704        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
705        removed_table_ids: &HashSet<TableId>,
706        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
707        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
708    ) {
709        for (table_id, change_log_delta) in change_log_delta {
710            let new_change_log = &change_log_delta.new_log;
711            match table_change_log.entry(*table_id) {
712                Entry::Occupied(entry) => {
713                    let change_log = entry.into_mut();
714                    change_log.add_change_log(new_change_log.clone());
715                }
716                Entry::Vacant(entry) => {
717                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
718                }
719            };
720        }
721
722        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
723        // the change log for the table, and then we will remove the table change log.
724        // The table change log will also be removed when the table id is removed.
725        table_change_log.retain(|table_id, _| {
726            if removed_table_ids.contains(table_id) {
727                return false;
728            }
729            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
730                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
731                // the table exists previously, and its committed epoch has progressed.
732            } else {
733                // otherwise, the table change log should be kept anyway
734                return true;
735            }
736            let contains = change_log_delta.contains_key(table_id);
737            if !contains {
738                warn!(
739                        ?table_id,
740                        "table change log dropped due to no further change log at newly committed epoch",
741                    );
742            }
743            contains
744        });
745
746        // truncate the remaining table change log
747        for (table_id, change_log_delta) in change_log_delta {
748            if let Some(change_log) = table_change_log.get_mut(table_id) {
749                change_log.truncate(change_log_delta.truncate_epoch);
750            }
751        }
752    }
753
754    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
755        let mut ret: BTreeMap<_, _> = BTreeMap::new();
756        for (compaction_group_id, group) in &self.levels {
757            let mut levels = vec![];
758            levels.extend(group.l0.sub_levels.iter());
759            levels.extend(group.levels.iter());
760            for level in levels {
761                for table_info in &level.table_infos {
762                    if table_info.sst_id.inner() == table_info.object_id.inner() {
763                        continue;
764                    }
765                    let object_id = table_info.object_id;
766                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
767                    entry
768                        .entry(*compaction_group_id)
769                        .or_default()
770                        .push(table_info.sst_id)
771                }
772            }
773        }
774        ret
775    }
776
777    pub fn merge_compaction_group(
778        &mut self,
779        left_group_id: CompactionGroupId,
780        right_group_id: CompactionGroupId,
781    ) {
782        // Double check
783        let left_group_id_table_ids = self
784            .state_table_info
785            .compaction_group_member_table_ids(left_group_id)
786            .iter()
787            .map(|table_id| table_id.table_id);
788        let right_group_id_table_ids = self
789            .state_table_info
790            .compaction_group_member_table_ids(right_group_id)
791            .iter()
792            .map(|table_id| table_id.table_id);
793
794        assert!(
795            left_group_id_table_ids
796                .chain(right_group_id_table_ids)
797                .is_sorted()
798        );
799
800        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
801        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
802            panic!(
803                "compaction group should exist right {} all {:?}",
804                right_group_id, total_cg
805            )
806        });
807
808        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
809            panic!(
810                "compaction group should exist left {} all {:?}",
811                left_group_id, total_cg
812            )
813        });
814
815        group_split::merge_levels(left_levels, right_levels);
816    }
817
818    pub fn init_with_parent_group_v2(
819        &mut self,
820        parent_group_id: CompactionGroupId,
821        group_id: CompactionGroupId,
822        new_sst_start_id: HummockSstableId,
823        split_key: Option<Bytes>,
824    ) {
825        let mut new_sst_id = new_sst_start_id;
826        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
827            if new_sst_start_id != 0 {
828                if cfg!(debug_assertions) {
829                    panic!(
830                        "non-zero sst start id {} for NewCompactionGroup",
831                        new_sst_start_id
832                    );
833                } else {
834                    warn!(
835                        %new_sst_start_id,
836                        "non-zero sst start id for NewCompactionGroup"
837                    );
838                }
839            }
840            return;
841        } else if !self.levels.contains_key(&parent_group_id) {
842            unreachable!(
843                "non-existing parent group id {} to init from (V2)",
844                parent_group_id
845            );
846        }
847
848        let [parent_levels, cur_levels] = self
849            .levels
850            .get_disjoint_mut([&parent_group_id, &group_id])
851            .map(|res| res.unwrap());
852        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
853        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
854        parent_levels.compaction_group_version_id += 1;
855        cur_levels.compaction_group_version_id += 1;
856
857        let l0 = &mut parent_levels.l0;
858        {
859            for sub_level in &mut l0.sub_levels {
860                let target_l0 = &mut cur_levels.l0;
861                // Remove SST from sub level may result in empty sub level. It will be purged
862                // whenever another compaction task is finished.
863                let insert_table_infos = if let Some(split_key) = &split_key {
864                    group_split::split_sst_info_for_level_v2(
865                        sub_level,
866                        &mut new_sst_id,
867                        split_key.clone(),
868                    )
869                } else {
870                    vec![]
871                };
872
873                if insert_table_infos.is_empty() {
874                    continue;
875                }
876
877                sub_level
878                    .table_infos
879                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
880                    .for_each(|sst_info| {
881                        sub_level.total_file_size -= sst_info.sst_size;
882                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
883                        l0.total_file_size -= sst_info.sst_size;
884                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
885                    });
886                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
887                    Ok(idx) => {
888                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
889                    }
890                    Err(idx) => {
891                        insert_new_sub_level(
892                            target_l0,
893                            sub_level.sub_level_id,
894                            sub_level.level_type,
895                            insert_table_infos,
896                            Some(idx),
897                        );
898                    }
899                }
900            }
901
902            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
903        }
904
905        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
906            let insert_table_infos = if let Some(split_key) = &split_key {
907                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
908            } else {
909                vec![]
910            };
911
912            if insert_table_infos.is_empty() {
913                continue;
914            }
915
916            cur_levels.levels[idx].total_file_size += insert_table_infos
917                .iter()
918                .map(|sst| sst.sst_size)
919                .sum::<u64>();
920            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
921                .iter()
922                .map(|sst| sst.uncompressed_file_size)
923                .sum::<u64>();
924            cur_levels.levels[idx]
925                .table_infos
926                .extend(insert_table_infos);
927            cur_levels.levels[idx]
928                .table_infos
929                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
930            assert!(can_concat(&cur_levels.levels[idx].table_infos));
931            level
932                .table_infos
933                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
934                .for_each(|sst_info| {
935                    level.total_file_size -= sst_info.sst_size;
936                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
937                });
938        }
939
940        assert!(
941            parent_levels
942                .l0
943                .sub_levels
944                .iter()
945                .all(|level| !level.table_infos.is_empty())
946        );
947        assert!(
948            cur_levels
949                .l0
950                .sub_levels
951                .iter()
952                .all(|level| !level.table_infos.is_empty())
953        );
954    }
955}
956
957impl<T> HummockVersionCommon<T>
958where
959    T: SstableIdReader + ObjectIdReader,
960{
961    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
962        self.get_sst_infos(exclude_change_log)
963            .map(|s| s.object_id())
964            .collect()
965    }
966
967    pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockObjectId> {
968        // DO NOT REMOVE THIS LINE
969        // This is to ensure that when adding new variant to `HummockObjectId`,
970        // the compiler will warn us if we forget to handle it here.
971        match HummockObjectId::Sstable(0.into()) {
972            HummockObjectId::Sstable(_) => {}
973        };
974        self.get_sst_infos(exclude_change_log)
975            .map(|s| HummockObjectId::Sstable(s.object_id()))
976            .collect()
977    }
978
979    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
980        self.get_sst_infos(exclude_change_log)
981            .map(|s| s.sst_id())
982            .collect()
983    }
984
985    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
986        let may_table_change_log = if exclude_change_log {
987            None
988        } else {
989            Some(self.table_change_log.values())
990        };
991        self.get_combined_levels()
992            .flat_map(|level| level.table_infos.iter())
993            .chain(
994                may_table_change_log
995                    .map(|v| {
996                        v.flat_map(|table_change_log| {
997                            table_change_log.iter().flat_map(|epoch_change_log| {
998                                epoch_change_log
999                                    .old_value
1000                                    .iter()
1001                                    .chain(epoch_change_log.new_value.iter())
1002                            })
1003                        })
1004                    })
1005                    .into_iter()
1006                    .flatten(),
1007            )
1008    }
1009}
1010
1011impl Levels {
1012    pub(crate) fn apply_compact_ssts(
1013        &mut self,
1014        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1015        member_table_ids: &BTreeSet<TableId>,
1016    ) {
1017        let IntraLevelDeltaCommon {
1018            level_idx,
1019            l0_sub_level_id,
1020            inserted_table_infos: insert_table_infos,
1021            vnode_partition_count,
1022            removed_table_ids: delete_sst_ids_set,
1023            compaction_group_version_id,
1024        } = level_delta;
1025        let new_vnode_partition_count = *vnode_partition_count;
1026
1027        if is_compaction_task_expired(
1028            self.compaction_group_version_id,
1029            *compaction_group_version_id,
1030        ) {
1031            warn!(
1032                current_compaction_group_version_id = self.compaction_group_version_id,
1033                delta_compaction_group_version_id = compaction_group_version_id,
1034                level_idx,
1035                l0_sub_level_id,
1036                insert_table_infos = ?insert_table_infos
1037                    .iter()
1038                    .map(|sst| (sst.sst_id, sst.object_id))
1039                    .collect_vec(),
1040                ?delete_sst_ids_set,
1041                "This VersionDelta may be committed by an expired compact task. Please check it."
1042            );
1043            return;
1044        }
1045        if !delete_sst_ids_set.is_empty() {
1046            if *level_idx == 0 {
1047                for level in &mut self.l0.sub_levels {
1048                    level_delete_ssts(level, delete_sst_ids_set);
1049                }
1050            } else {
1051                let idx = *level_idx as usize - 1;
1052                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1053            }
1054        }
1055
1056        if !insert_table_infos.is_empty() {
1057            let insert_sst_level_id = *level_idx;
1058            let insert_sub_level_id = *l0_sub_level_id;
1059            if insert_sst_level_id == 0 {
1060                let l0 = &mut self.l0;
1061                let index = l0
1062                    .sub_levels
1063                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1064                assert!(
1065                    index < l0.sub_levels.len()
1066                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1067                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1068                    insert_sub_level_id,
1069                    delete_sst_ids_set,
1070                    l0.sub_levels
1071                        .iter()
1072                        .map(|level| level.sub_level_id)
1073                        .collect_vec()
1074                );
1075                if l0.sub_levels[index].table_infos.is_empty()
1076                    && member_table_ids.len() == 1
1077                    && insert_table_infos.iter().all(|sst| {
1078                        sst.table_ids.len() == 1
1079                            && sst.table_ids[0]
1080                                == member_table_ids.iter().next().expect("non-empty").table_id
1081                    })
1082                {
1083                    // Only change vnode_partition_count for group which has only one state-table.
1084                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1085                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1086                }
1087                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1088            } else {
1089                let idx = insert_sst_level_id as usize - 1;
1090                if self.levels[idx].table_infos.is_empty()
1091                    && insert_table_infos
1092                        .iter()
1093                        .all(|sst| sst.table_ids.len() == 1)
1094                {
1095                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1096                } else if self.levels[idx].vnode_partition_count != 0
1097                    && new_vnode_partition_count == 0
1098                    && member_table_ids.len() > 1
1099                {
1100                    self.levels[idx].vnode_partition_count = 0;
1101                }
1102                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1103            }
1104        }
1105    }
1106
1107    pub(crate) fn post_apply_l0_compact(&mut self) {
1108        {
1109            self.l0
1110                .sub_levels
1111                .retain(|level| !level.table_infos.is_empty());
1112            self.l0.total_file_size = self
1113                .l0
1114                .sub_levels
1115                .iter()
1116                .map(|level| level.total_file_size)
1117                .sum::<u64>();
1118            self.l0.uncompressed_file_size = self
1119                .l0
1120                .sub_levels
1121                .iter()
1122                .map(|level| level.uncompressed_file_size)
1123                .sum::<u64>();
1124        }
1125    }
1126}
1127
1128impl<T, L> HummockVersionCommon<T, L> {
1129    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1130        self.levels
1131            .values()
1132            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1133    }
1134}
1135
1136pub fn build_initial_compaction_group_levels(
1137    group_id: CompactionGroupId,
1138    compaction_config: &CompactionConfig,
1139) -> Levels {
1140    let mut levels = vec![];
1141    for l in 0..compaction_config.get_max_level() {
1142        levels.push(Level {
1143            level_idx: (l + 1) as u32,
1144            level_type: PbLevelType::Nonoverlapping,
1145            table_infos: vec![],
1146            total_file_size: 0,
1147            sub_level_id: 0,
1148            uncompressed_file_size: 0,
1149            vnode_partition_count: 0,
1150        });
1151    }
1152    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1153    Levels {
1154        levels,
1155        l0: OverlappingLevel {
1156            sub_levels: vec![],
1157            total_file_size: 0,
1158            uncompressed_file_size: 0,
1159        },
1160        group_id,
1161        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1162        member_table_ids: vec![],
1163        compaction_group_version_id: 0,
1164    }
1165}
1166
1167fn split_sst_info_for_level(
1168    member_table_ids: &BTreeSet<u32>,
1169    level: &mut Level,
1170    new_sst_id: &mut HummockSstableId,
1171) -> Vec<SstableInfo> {
1172    // Remove SST from sub level may result in empty sub level. It will be purged
1173    // whenever another compaction task is finished.
1174    let mut insert_table_infos = vec![];
1175    for sst_info in &mut level.table_infos {
1176        let removed_table_ids = sst_info
1177            .table_ids
1178            .iter()
1179            .filter(|table_id| member_table_ids.contains(table_id))
1180            .cloned()
1181            .collect_vec();
1182        let sst_size = sst_info.sst_size;
1183        if sst_size / 2 == 0 {
1184            tracing::warn!(
1185                id = %sst_info.sst_id,
1186                object_id = %sst_info.object_id,
1187                sst_size = sst_info.sst_size,
1188                file_size = sst_info.file_size,
1189                "Sstable sst_size is under expected",
1190            );
1191        };
1192        if !removed_table_ids.is_empty() {
1193            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1194                sst_info,
1195                new_sst_id,
1196                sst_size / 2,
1197                sst_size / 2,
1198                member_table_ids.iter().cloned().collect_vec(),
1199            );
1200            *sst_info = modified_sst;
1201            insert_table_infos.push(branch_sst);
1202        }
1203    }
1204    insert_table_infos
1205}
1206
1207/// Gets all compaction group ids.
1208pub fn get_compaction_group_ids(
1209    version: &HummockVersion,
1210) -> impl Iterator<Item = CompactionGroupId> + '_ {
1211    version.levels.keys().cloned()
1212}
1213
1214pub fn get_table_compaction_group_id_mapping(
1215    version: &HummockVersion,
1216) -> HashMap<StateTableId, CompactionGroupId> {
1217    version
1218        .state_table_info
1219        .info()
1220        .iter()
1221        .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1222        .collect()
1223}
1224
1225/// Gets all SSTs in `group_id`
1226pub fn get_compaction_group_ssts(
1227    version: &HummockVersion,
1228    group_id: CompactionGroupId,
1229) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1230    let group_levels = version.get_compaction_group_levels(group_id);
1231    group_levels
1232        .l0
1233        .sub_levels
1234        .iter()
1235        .rev()
1236        .chain(group_levels.levels.iter())
1237        .flat_map(|level| {
1238            level
1239                .table_infos
1240                .iter()
1241                .map(|table_info| (table_info.object_id, table_info.sst_id))
1242        })
1243}
1244
1245pub fn new_sub_level(
1246    sub_level_id: u64,
1247    level_type: PbLevelType,
1248    table_infos: Vec<SstableInfo>,
1249) -> Level {
1250    if level_type == PbLevelType::Nonoverlapping {
1251        debug_assert!(
1252            can_concat(&table_infos),
1253            "sst of non-overlapping level is not concat-able: {:?}",
1254            table_infos
1255        );
1256    }
1257    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1258    let uncompressed_file_size = table_infos
1259        .iter()
1260        .map(|table| table.uncompressed_file_size)
1261        .sum();
1262    Level {
1263        level_idx: 0,
1264        level_type,
1265        table_infos,
1266        total_file_size,
1267        sub_level_id,
1268        uncompressed_file_size,
1269        vnode_partition_count: 0,
1270    }
1271}
1272
1273pub fn add_ssts_to_sub_level(
1274    l0: &mut OverlappingLevel,
1275    sub_level_idx: usize,
1276    insert_table_infos: Vec<SstableInfo>,
1277) {
1278    insert_table_infos.iter().for_each(|sst| {
1279        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1280        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1281        l0.total_file_size += sst.sst_size;
1282        l0.uncompressed_file_size += sst.uncompressed_file_size;
1283    });
1284    l0.sub_levels[sub_level_idx]
1285        .table_infos
1286        .extend(insert_table_infos);
1287    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1288        l0.sub_levels[sub_level_idx]
1289            .table_infos
1290            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1291        assert!(
1292            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1293            "sstable ids: {:?}",
1294            l0.sub_levels[sub_level_idx]
1295                .table_infos
1296                .iter()
1297                .map(|sst| sst.sst_id)
1298                .collect_vec()
1299        );
1300    }
1301}
1302
1303/// `None` value of `sub_level_insert_hint` means append.
1304pub fn insert_new_sub_level(
1305    l0: &mut OverlappingLevel,
1306    insert_sub_level_id: u64,
1307    level_type: PbLevelType,
1308    insert_table_infos: Vec<SstableInfo>,
1309    sub_level_insert_hint: Option<usize>,
1310) {
1311    if insert_sub_level_id == u64::MAX {
1312        return;
1313    }
1314    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1315        insert_pos
1316    } else {
1317        if let Some(newest_level) = l0.sub_levels.last() {
1318            assert!(
1319                newest_level.sub_level_id < insert_sub_level_id,
1320                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1321                newest_level.sub_level_id,
1322                insert_sub_level_id,
1323                l0,
1324            );
1325        }
1326        l0.sub_levels.len()
1327    };
1328    #[cfg(debug_assertions)]
1329    {
1330        if insert_pos > 0 {
1331            if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1332                debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1333            }
1334        }
1335        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1336            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1337        }
1338    }
1339    // All files will be committed in one new Overlapping sub-level and become
1340    // Nonoverlapping  after at least one compaction.
1341    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1342    l0.total_file_size += level.total_file_size;
1343    l0.uncompressed_file_size += level.uncompressed_file_size;
1344    l0.sub_levels.insert(insert_pos, level);
1345}
1346
1347/// Delete sstables if the table id is in the id set.
1348///
1349/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1350fn level_delete_ssts(
1351    operand: &mut Level,
1352    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1353) -> bool {
1354    let original_len = operand.table_infos.len();
1355    operand
1356        .table_infos
1357        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1358    operand.total_file_size = operand
1359        .table_infos
1360        .iter()
1361        .map(|table| table.sst_size)
1362        .sum::<u64>();
1363    operand.uncompressed_file_size = operand
1364        .table_infos
1365        .iter()
1366        .map(|table| table.uncompressed_file_size)
1367        .sum::<u64>();
1368    original_len != operand.table_infos.len()
1369}
1370
1371fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1372    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1373        format!(
1374            "sstable ids: {:?}",
1375            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1376        )
1377    }
1378    operand.total_file_size += insert_table_infos
1379        .iter()
1380        .map(|sst| sst.sst_size)
1381        .sum::<u64>();
1382    operand.uncompressed_file_size += insert_table_infos
1383        .iter()
1384        .map(|sst| sst.uncompressed_file_size)
1385        .sum::<u64>();
1386    if operand.level_type == PbLevelType::Overlapping {
1387        operand.level_type = PbLevelType::Nonoverlapping;
1388        operand
1389            .table_infos
1390            .extend(insert_table_infos.iter().cloned());
1391        operand
1392            .table_infos
1393            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1394        assert!(
1395            can_concat(&operand.table_infos),
1396            "{}",
1397            display_sstable_infos(&operand.table_infos)
1398        );
1399    } else if !insert_table_infos.is_empty() {
1400        let sorted_insert: Vec<_> = insert_table_infos
1401            .iter()
1402            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1403            .cloned()
1404            .collect();
1405        let first = &sorted_insert[0];
1406        let last = &sorted_insert[sorted_insert.len() - 1];
1407        let pos = operand
1408            .table_infos
1409            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1410        if pos >= operand.table_infos.len()
1411            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1412        {
1413            operand.table_infos.splice(pos..pos, sorted_insert);
1414            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1415            let validate_range = operand
1416                .table_infos
1417                .iter()
1418                .skip(pos.saturating_sub(1))
1419                .take(insert_table_infos.len() + 2)
1420                .collect_vec();
1421            assert!(
1422                can_concat(&validate_range),
1423                "{}",
1424                display_sstable_infos(&validate_range),
1425            );
1426        } else {
1427            // If this branch is reached, it indicates some unexpected behavior in compaction.
1428            // Here we issue a warning and fall back to insert one by one.
1429            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1430            for i in insert_table_infos {
1431                let pos = operand
1432                    .table_infos
1433                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1434                operand.table_infos.insert(pos, i.clone());
1435            }
1436            assert!(
1437                can_concat(&operand.table_infos),
1438                "{}",
1439                display_sstable_infos(&operand.table_infos)
1440            );
1441        }
1442    }
1443}
1444
1445pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1446    // DO NOT REMOVE THIS LINE
1447    // This is to ensure that when adding new variant to `HummockObjectId`,
1448    // the compiler will warn us if we forget to handle it here.
1449    match HummockObjectId::Sstable(0.into()) {
1450        HummockObjectId::Sstable(_) => {}
1451    };
1452    version
1453        .levels
1454        .values()
1455        .flat_map(|cg| {
1456            cg.level0()
1457                .sub_levels
1458                .iter()
1459                .chain(cg.levels.iter())
1460                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1461        })
1462        .chain(version.table_change_log.values().flat_map(|c| {
1463            c.iter().flat_map(|l| {
1464                l.old_value
1465                    .iter()
1466                    .chain(l.new_value.iter())
1467                    .map(|t| (t.object_id, t.file_size))
1468            })
1469        }))
1470        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1471        .collect()
1472}
1473
1474/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1475/// Currently this method is only used by risectl validate-version.
1476pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1477    let mut res = Vec::new();
1478    // Ensure each table maps to only one compaction group
1479    for (group_id, levels) in &version.levels {
1480        // Ensure compaction group id matches
1481        if levels.group_id != *group_id {
1482            res.push(format!(
1483                "GROUP {}: inconsistent group id {} in Levels",
1484                group_id, levels.group_id
1485            ));
1486        }
1487
1488        let validate_level = |group: CompactionGroupId,
1489                              expected_level_idx: u32,
1490                              level: &Level,
1491                              res: &mut Vec<String>| {
1492            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1493            if level.level_idx == 0 {
1494                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1495                // Ensure sub-level is not empty
1496                if level.table_infos.is_empty() {
1497                    res.push(format!("{}: empty level", level_identifier));
1498                }
1499            } else if level.level_type != PbLevelType::Nonoverlapping {
1500                // Ensure non-L0 level is non-overlapping level
1501                res.push(format!(
1502                    "{}: level type {:?} is not non-overlapping",
1503                    level_identifier, level.level_type
1504                ));
1505            }
1506
1507            // Ensure level idx matches
1508            if level.level_idx != expected_level_idx {
1509                res.push(format!(
1510                    "{}: mismatched level idx {}",
1511                    level_identifier, expected_level_idx
1512                ));
1513            }
1514
1515            let mut prev_table_info: Option<&SstableInfo> = None;
1516            for table_info in &level.table_infos {
1517                // Ensure table_ids are sorted and unique
1518                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1519                    res.push(format!(
1520                        "{} SST {}: table_ids not sorted",
1521                        level_identifier, table_info.object_id
1522                    ));
1523                }
1524
1525                // Ensure SSTs in non-overlapping level have non-overlapping key range
1526                if level.level_type == PbLevelType::Nonoverlapping {
1527                    if let Some(prev) = prev_table_info.take() {
1528                        if prev
1529                            .key_range
1530                            .compare_right_with(&table_info.key_range.left)
1531                            != Ordering::Less
1532                        {
1533                            res.push(format!(
1534                                "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1535                                level_identifier, table_info.object_id, prev, table_info
1536                            ));
1537                        }
1538                    }
1539                    let _ = prev_table_info.insert(table_info);
1540                }
1541            }
1542        };
1543
1544        let l0 = &levels.l0;
1545        let mut prev_sub_level_id = u64::MAX;
1546        for sub_level in &l0.sub_levels {
1547            // Ensure sub_level_id is sorted and unique
1548            if sub_level.sub_level_id >= prev_sub_level_id {
1549                res.push(format!(
1550                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1551                    group_id, sub_level.level_idx, prev_sub_level_id
1552                ));
1553            }
1554            prev_sub_level_id = sub_level.sub_level_id;
1555
1556            validate_level(*group_id, 0, sub_level, &mut res);
1557        }
1558
1559        for idx in 1..=levels.levels.len() {
1560            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1561        }
1562    }
1563    res
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568    use std::collections::{HashMap, HashSet};
1569
1570    use bytes::Bytes;
1571    use risingwave_common::catalog::TableId;
1572    use risingwave_common::hash::VirtualNode;
1573    use risingwave_common::util::epoch::test_epoch;
1574    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1575
1576    use super::group_split;
1577    use crate::HummockVersionId;
1578    use crate::compaction_group::group_split::*;
1579    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1580    use crate::key::{FullKey, gen_key_from_str};
1581    use crate::key_range::KeyRange;
1582    use crate::level::{Level, Levels, OverlappingLevel};
1583    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1584    use crate::version::{
1585        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1586    };
1587
1588    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1589        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1590    }
1591
1592    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1593        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1594        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1595        let full_key_l = FullKey::for_test(
1596            TableId::new(*table_ids.first().unwrap()),
1597            table_key_l,
1598            epoch,
1599        )
1600        .encode();
1601        let full_key_r =
1602            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1603                .encode();
1604
1605        SstableInfoInner {
1606            sst_id: sst_id.into(),
1607            key_range: KeyRange {
1608                left: full_key_l.into(),
1609                right: full_key_r.into(),
1610                right_exclusive: false,
1611            },
1612            table_ids,
1613            object_id: sst_id.into(),
1614            min_epoch: 20,
1615            max_epoch: 20,
1616            file_size: 100,
1617            sst_size: 100,
1618            ..Default::default()
1619        }
1620    }
1621
1622    #[test]
1623    fn test_get_sst_object_ids() {
1624        let mut version = HummockVersion {
1625            id: HummockVersionId::new(0),
1626            levels: HashMap::from_iter([(
1627                0,
1628                Levels {
1629                    levels: vec![],
1630                    l0: OverlappingLevel {
1631                        sub_levels: vec![],
1632                        total_file_size: 0,
1633                        uncompressed_file_size: 0,
1634                    },
1635                    ..Default::default()
1636                },
1637            )]),
1638            ..Default::default()
1639        };
1640        assert_eq!(version.get_object_ids(false).len(), 0);
1641
1642        // Add to sub level
1643        version
1644            .levels
1645            .get_mut(&0)
1646            .unwrap()
1647            .l0
1648            .sub_levels
1649            .push(Level {
1650                table_infos: vec![
1651                    SstableInfoInner {
1652                        object_id: 11.into(),
1653                        sst_id: 11.into(),
1654                        ..Default::default()
1655                    }
1656                    .into(),
1657                ],
1658                ..Default::default()
1659            });
1660        assert_eq!(version.get_object_ids(false).len(), 1);
1661
1662        // Add to non sub level
1663        version.levels.get_mut(&0).unwrap().levels.push(Level {
1664            table_infos: vec![
1665                SstableInfoInner {
1666                    object_id: 22.into(),
1667                    sst_id: 22.into(),
1668                    ..Default::default()
1669                }
1670                .into(),
1671            ],
1672            ..Default::default()
1673        });
1674        assert_eq!(version.get_object_ids(false).len(), 2);
1675    }
1676
1677    #[test]
1678    fn test_apply_version_delta() {
1679        let mut version = HummockVersion {
1680            id: HummockVersionId::new(0),
1681            levels: HashMap::from_iter([
1682                (
1683                    0,
1684                    build_initial_compaction_group_levels(
1685                        0,
1686                        &CompactionConfig {
1687                            max_level: 6,
1688                            ..Default::default()
1689                        },
1690                    ),
1691                ),
1692                (
1693                    1,
1694                    build_initial_compaction_group_levels(
1695                        1,
1696                        &CompactionConfig {
1697                            max_level: 6,
1698                            ..Default::default()
1699                        },
1700                    ),
1701                ),
1702            ]),
1703            ..Default::default()
1704        };
1705        let version_delta = HummockVersionDelta {
1706            id: HummockVersionId::new(1),
1707            group_deltas: HashMap::from_iter([
1708                (
1709                    2,
1710                    GroupDeltas {
1711                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1712                            group_config: Some(CompactionConfig {
1713                                max_level: 6,
1714                                ..Default::default()
1715                            }),
1716                            ..Default::default()
1717                        }))],
1718                    },
1719                ),
1720                (
1721                    0,
1722                    GroupDeltas {
1723                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1724                    },
1725                ),
1726                (
1727                    1,
1728                    GroupDeltas {
1729                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1730                            1,
1731                            0,
1732                            HashSet::new(),
1733                            vec![
1734                                SstableInfoInner {
1735                                    object_id: 1.into(),
1736                                    sst_id: 1.into(),
1737                                    ..Default::default()
1738                                }
1739                                .into(),
1740                            ],
1741                            0,
1742                            version
1743                                .levels
1744                                .get(&1)
1745                                .as_ref()
1746                                .unwrap()
1747                                .compaction_group_version_id,
1748                        ))],
1749                    },
1750                ),
1751            ]),
1752            ..Default::default()
1753        };
1754        let version_delta = version_delta;
1755
1756        version.apply_version_delta(&version_delta);
1757        let mut cg1 = build_initial_compaction_group_levels(
1758            1,
1759            &CompactionConfig {
1760                max_level: 6,
1761                ..Default::default()
1762            },
1763        );
1764        cg1.levels[0] = Level {
1765            level_idx: 1,
1766            level_type: LevelType::Nonoverlapping,
1767            table_infos: vec![
1768                SstableInfoInner {
1769                    object_id: 1.into(),
1770                    sst_id: 1.into(),
1771                    ..Default::default()
1772                }
1773                .into(),
1774            ],
1775            ..Default::default()
1776        };
1777        assert_eq!(
1778            version,
1779            HummockVersion {
1780                id: HummockVersionId::new(1),
1781                levels: HashMap::from_iter([
1782                    (
1783                        2,
1784                        build_initial_compaction_group_levels(
1785                            2,
1786                            &CompactionConfig {
1787                                max_level: 6,
1788                                ..Default::default()
1789                            },
1790                        ),
1791                    ),
1792                    (1, cg1),
1793                ]),
1794                ..Default::default()
1795            }
1796        );
1797    }
1798
1799    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1800        gen_sst_info_impl(object_id, table_ids, left, right).into()
1801    }
1802
1803    fn gen_sst_info_impl(
1804        object_id: u64,
1805        table_ids: Vec<u32>,
1806        left: Bytes,
1807        right: Bytes,
1808    ) -> SstableInfoInner {
1809        SstableInfoInner {
1810            object_id: object_id.into(),
1811            sst_id: object_id.into(),
1812            key_range: KeyRange {
1813                left,
1814                right,
1815                right_exclusive: false,
1816            },
1817            table_ids,
1818            file_size: 100,
1819            sst_size: 100,
1820            uncompressed_file_size: 100,
1821            ..Default::default()
1822        }
1823    }
1824
1825    #[test]
1826    fn test_merge_levels() {
1827        let mut left_levels = build_initial_compaction_group_levels(
1828            1,
1829            &CompactionConfig {
1830                max_level: 6,
1831                ..Default::default()
1832            },
1833        );
1834
1835        let mut right_levels = build_initial_compaction_group_levels(
1836            2,
1837            &CompactionConfig {
1838                max_level: 6,
1839                ..Default::default()
1840            },
1841        );
1842
1843        left_levels.levels[0] = Level {
1844            level_idx: 1,
1845            level_type: LevelType::Nonoverlapping,
1846            table_infos: vec![
1847                gen_sst_info(
1848                    1,
1849                    vec![3],
1850                    FullKey::for_test(
1851                        TableId::new(3),
1852                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1853                        0,
1854                    )
1855                    .encode()
1856                    .into(),
1857                    FullKey::for_test(
1858                        TableId::new(3),
1859                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1860                        0,
1861                    )
1862                    .encode()
1863                    .into(),
1864                ),
1865                gen_sst_info(
1866                    10,
1867                    vec![3, 4],
1868                    FullKey::for_test(
1869                        TableId::new(3),
1870                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1871                        0,
1872                    )
1873                    .encode()
1874                    .into(),
1875                    FullKey::for_test(
1876                        TableId::new(4),
1877                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1878                        0,
1879                    )
1880                    .encode()
1881                    .into(),
1882                ),
1883                gen_sst_info(
1884                    11,
1885                    vec![4],
1886                    FullKey::for_test(
1887                        TableId::new(4),
1888                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1889                        0,
1890                    )
1891                    .encode()
1892                    .into(),
1893                    FullKey::for_test(
1894                        TableId::new(4),
1895                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1896                        0,
1897                    )
1898                    .encode()
1899                    .into(),
1900                ),
1901            ],
1902            total_file_size: 300,
1903            ..Default::default()
1904        };
1905
1906        left_levels.l0.sub_levels.push(Level {
1907            level_idx: 0,
1908            table_infos: vec![gen_sst_info(
1909                3,
1910                vec![3],
1911                FullKey::for_test(
1912                    TableId::new(3),
1913                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1914                    0,
1915                )
1916                .encode()
1917                .into(),
1918                FullKey::for_test(
1919                    TableId::new(3),
1920                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1921                    0,
1922                )
1923                .encode()
1924                .into(),
1925            )],
1926            sub_level_id: 101,
1927            level_type: LevelType::Overlapping,
1928            total_file_size: 100,
1929            ..Default::default()
1930        });
1931
1932        left_levels.l0.sub_levels.push(Level {
1933            level_idx: 0,
1934            table_infos: vec![gen_sst_info(
1935                3,
1936                vec![3],
1937                FullKey::for_test(
1938                    TableId::new(3),
1939                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1940                    0,
1941                )
1942                .encode()
1943                .into(),
1944                FullKey::for_test(
1945                    TableId::new(3),
1946                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1947                    0,
1948                )
1949                .encode()
1950                .into(),
1951            )],
1952            sub_level_id: 103,
1953            level_type: LevelType::Overlapping,
1954            total_file_size: 100,
1955            ..Default::default()
1956        });
1957
1958        left_levels.l0.sub_levels.push(Level {
1959            level_idx: 0,
1960            table_infos: vec![gen_sst_info(
1961                3,
1962                vec![3],
1963                FullKey::for_test(
1964                    TableId::new(3),
1965                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1966                    0,
1967                )
1968                .encode()
1969                .into(),
1970                FullKey::for_test(
1971                    TableId::new(3),
1972                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1973                    0,
1974                )
1975                .encode()
1976                .into(),
1977            )],
1978            sub_level_id: 105,
1979            level_type: LevelType::Nonoverlapping,
1980            total_file_size: 100,
1981            ..Default::default()
1982        });
1983
1984        right_levels.levels[0] = Level {
1985            level_idx: 1,
1986            level_type: LevelType::Nonoverlapping,
1987            table_infos: vec![
1988                gen_sst_info(
1989                    1,
1990                    vec![5],
1991                    FullKey::for_test(
1992                        TableId::new(5),
1993                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1994                        0,
1995                    )
1996                    .encode()
1997                    .into(),
1998                    FullKey::for_test(
1999                        TableId::new(5),
2000                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2001                        0,
2002                    )
2003                    .encode()
2004                    .into(),
2005                ),
2006                gen_sst_info(
2007                    10,
2008                    vec![5, 6],
2009                    FullKey::for_test(
2010                        TableId::new(5),
2011                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2012                        0,
2013                    )
2014                    .encode()
2015                    .into(),
2016                    FullKey::for_test(
2017                        TableId::new(6),
2018                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2019                        0,
2020                    )
2021                    .encode()
2022                    .into(),
2023                ),
2024                gen_sst_info(
2025                    11,
2026                    vec![6],
2027                    FullKey::for_test(
2028                        TableId::new(6),
2029                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2030                        0,
2031                    )
2032                    .encode()
2033                    .into(),
2034                    FullKey::for_test(
2035                        TableId::new(6),
2036                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2037                        0,
2038                    )
2039                    .encode()
2040                    .into(),
2041                ),
2042            ],
2043            total_file_size: 300,
2044            ..Default::default()
2045        };
2046
2047        right_levels.l0.sub_levels.push(Level {
2048            level_idx: 0,
2049            table_infos: vec![gen_sst_info(
2050                3,
2051                vec![5],
2052                FullKey::for_test(
2053                    TableId::new(5),
2054                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2055                    0,
2056                )
2057                .encode()
2058                .into(),
2059                FullKey::for_test(
2060                    TableId::new(5),
2061                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2062                    0,
2063                )
2064                .encode()
2065                .into(),
2066            )],
2067            sub_level_id: 101,
2068            level_type: LevelType::Overlapping,
2069            total_file_size: 100,
2070            ..Default::default()
2071        });
2072
2073        right_levels.l0.sub_levels.push(Level {
2074            level_idx: 0,
2075            table_infos: vec![gen_sst_info(
2076                5,
2077                vec![5],
2078                FullKey::for_test(
2079                    TableId::new(5),
2080                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2081                    0,
2082                )
2083                .encode()
2084                .into(),
2085                FullKey::for_test(
2086                    TableId::new(5),
2087                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2088                    0,
2089                )
2090                .encode()
2091                .into(),
2092            )],
2093            sub_level_id: 102,
2094            level_type: LevelType::Overlapping,
2095            total_file_size: 100,
2096            ..Default::default()
2097        });
2098
2099        right_levels.l0.sub_levels.push(Level {
2100            level_idx: 0,
2101            table_infos: vec![gen_sst_info(
2102                3,
2103                vec![5],
2104                FullKey::for_test(
2105                    TableId::new(5),
2106                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2107                    0,
2108                )
2109                .encode()
2110                .into(),
2111                FullKey::for_test(
2112                    TableId::new(5),
2113                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2114                    0,
2115                )
2116                .encode()
2117                .into(),
2118            )],
2119            sub_level_id: 103,
2120            level_type: LevelType::Nonoverlapping,
2121            total_file_size: 100,
2122            ..Default::default()
2123        });
2124
2125        {
2126            // test empty
2127            let mut left_levels = Levels::default();
2128            let right_levels = Levels::default();
2129
2130            group_split::merge_levels(&mut left_levels, right_levels);
2131        }
2132
2133        {
2134            // test empty left
2135            let mut left_levels = build_initial_compaction_group_levels(
2136                1,
2137                &CompactionConfig {
2138                    max_level: 6,
2139                    ..Default::default()
2140                },
2141            );
2142            let right_levels = right_levels.clone();
2143
2144            group_split::merge_levels(&mut left_levels, right_levels);
2145
2146            assert!(left_levels.l0.sub_levels.len() == 3);
2147            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2148            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2149            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2150            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2151            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2152            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2153
2154            assert!(left_levels.levels[0].level_idx == 1);
2155            assert_eq!(300, left_levels.levels[0].total_file_size);
2156        }
2157
2158        {
2159            // test empty right
2160            let mut left_levels = left_levels.clone();
2161            let right_levels = build_initial_compaction_group_levels(
2162                2,
2163                &CompactionConfig {
2164                    max_level: 6,
2165                    ..Default::default()
2166                },
2167            );
2168
2169            group_split::merge_levels(&mut left_levels, right_levels);
2170
2171            assert!(left_levels.l0.sub_levels.len() == 3);
2172            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2173            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2174            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2175            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2176            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2177            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2178
2179            assert!(left_levels.levels[0].level_idx == 1);
2180            assert_eq!(300, left_levels.levels[0].total_file_size);
2181        }
2182
2183        {
2184            let mut left_levels = left_levels.clone();
2185            let right_levels = right_levels.clone();
2186
2187            group_split::merge_levels(&mut left_levels, right_levels);
2188
2189            assert!(left_levels.l0.sub_levels.len() == 6);
2190            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2191            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2192            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2193            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2194            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2195            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2196            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2197            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2198            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2199            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2200            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2201            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2202
2203            assert!(left_levels.levels[0].level_idx == 1);
2204            assert_eq!(600, left_levels.levels[0].total_file_size);
2205        }
2206    }
2207
2208    #[test]
2209    fn test_get_split_pos() {
2210        let epoch = test_epoch(1);
2211        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2212        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2213        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2214
2215        let ssts = vec![s1, s2, s3];
2216        let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2217
2218        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2219        assert_eq!(1, pos);
2220
2221        let pos = group_split::get_split_pos(&vec![], split_key);
2222        assert_eq!(0, pos);
2223    }
2224
2225    #[test]
2226    fn test_split_sst() {
2227        let epoch = test_epoch(1);
2228        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2229
2230        {
2231            let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2232            let origin_sst = sst.clone();
2233            let sst_size = origin_sst.sst_size;
2234            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2235            assert_eq!(SstSplitType::Both, split_type);
2236
2237            let mut new_sst_id = 10.into();
2238            let (origin_sst, branched_sst) = group_split::split_sst(
2239                origin_sst,
2240                &mut new_sst_id,
2241                split_key,
2242                sst_size / 2,
2243                sst_size / 2,
2244            );
2245
2246            let origin_sst = origin_sst.unwrap();
2247            let branched_sst = branched_sst.unwrap();
2248
2249            assert!(origin_sst.key_range.right_exclusive);
2250            assert!(
2251                origin_sst
2252                    .key_range
2253                    .right
2254                    .cmp(&branched_sst.key_range.left)
2255                    .is_le()
2256            );
2257            assert!(origin_sst.table_ids.is_sorted());
2258            assert!(branched_sst.table_ids.is_sorted());
2259            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2260            assert!(branched_sst.sst_size < origin_sst.file_size);
2261            assert_eq!(10, branched_sst.sst_id);
2262            assert_eq!(11, origin_sst.sst_id);
2263            assert_eq!(&3, branched_sst.table_ids.first().unwrap()); // split table_id to right
2264        }
2265
2266        {
2267            // test un-exist table_id
2268            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2269            let origin_sst = sst.clone();
2270            let sst_size = origin_sst.sst_size;
2271            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2272            assert_eq!(SstSplitType::Both, split_type);
2273
2274            let mut new_sst_id = 10.into();
2275            let (origin_sst, branched_sst) = group_split::split_sst(
2276                origin_sst,
2277                &mut new_sst_id,
2278                split_key,
2279                sst_size / 2,
2280                sst_size / 2,
2281            );
2282
2283            let origin_sst = origin_sst.unwrap();
2284            let branched_sst = branched_sst.unwrap();
2285
2286            assert!(origin_sst.key_range.right_exclusive);
2287            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2288            assert!(origin_sst.table_ids.is_sorted());
2289            assert!(branched_sst.table_ids.is_sorted());
2290            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2291            assert!(branched_sst.sst_size < origin_sst.file_size);
2292            assert_eq!(10, branched_sst.sst_id);
2293            assert_eq!(11, origin_sst.sst_id);
2294            assert_eq!(&5, branched_sst.table_ids.first().unwrap()); // split table_id to right
2295        }
2296
2297        {
2298            let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2299            let origin_sst = sst.clone();
2300            let split_type = group_split::need_to_split(&origin_sst, split_key);
2301            assert_eq!(SstSplitType::Left, split_type);
2302        }
2303
2304        {
2305            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2306            let origin_sst = sst.clone();
2307            let split_type = group_split::need_to_split(&origin_sst, split_key);
2308            assert_eq!(SstSplitType::Both, split_type);
2309
2310            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2311            let origin_sst = sst.clone();
2312            let split_type = group_split::need_to_split(&origin_sst, split_key);
2313            assert_eq!(SstSplitType::Right, split_type);
2314        }
2315
2316        {
2317            // test key_range left = right
2318            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2319            sst.key_range.right = sst.key_range.left.clone();
2320            let sst: SstableInfo = sst.into();
2321            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2322            let origin_sst = sst.clone();
2323            let sst_size = origin_sst.sst_size;
2324
2325            let mut new_sst_id = 10.into();
2326            let (origin_sst, branched_sst) = group_split::split_sst(
2327                origin_sst,
2328                &mut new_sst_id,
2329                split_key,
2330                sst_size / 2,
2331                sst_size / 2,
2332            );
2333
2334            assert!(origin_sst.is_none());
2335            assert!(branched_sst.is_some());
2336        }
2337    }
2338
2339    #[test]
2340    fn test_split_sst_info_for_level() {
2341        let mut version = HummockVersion {
2342            id: HummockVersionId(0),
2343            levels: HashMap::from_iter([(
2344                1,
2345                build_initial_compaction_group_levels(
2346                    1,
2347                    &CompactionConfig {
2348                        max_level: 6,
2349                        ..Default::default()
2350                    },
2351                ),
2352            )]),
2353            ..Default::default()
2354        };
2355
2356        let cg1 = version.levels.get_mut(&1).unwrap();
2357
2358        cg1.levels[0] = Level {
2359            level_idx: 1,
2360            level_type: LevelType::Nonoverlapping,
2361            table_infos: vec![
2362                gen_sst_info(
2363                    1,
2364                    vec![3],
2365                    FullKey::for_test(
2366                        TableId::new(3),
2367                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2368                        0,
2369                    )
2370                    .encode()
2371                    .into(),
2372                    FullKey::for_test(
2373                        TableId::new(3),
2374                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2375                        0,
2376                    )
2377                    .encode()
2378                    .into(),
2379                ),
2380                gen_sst_info(
2381                    10,
2382                    vec![3, 4],
2383                    FullKey::for_test(
2384                        TableId::new(3),
2385                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2386                        0,
2387                    )
2388                    .encode()
2389                    .into(),
2390                    FullKey::for_test(
2391                        TableId::new(4),
2392                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2393                        0,
2394                    )
2395                    .encode()
2396                    .into(),
2397                ),
2398                gen_sst_info(
2399                    11,
2400                    vec![4],
2401                    FullKey::for_test(
2402                        TableId::new(4),
2403                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2404                        0,
2405                    )
2406                    .encode()
2407                    .into(),
2408                    FullKey::for_test(
2409                        TableId::new(4),
2410                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2411                        0,
2412                    )
2413                    .encode()
2414                    .into(),
2415                ),
2416            ],
2417            total_file_size: 300,
2418            ..Default::default()
2419        };
2420
2421        cg1.l0.sub_levels.push(Level {
2422            level_idx: 0,
2423            table_infos: vec![
2424                gen_sst_info(
2425                    2,
2426                    vec![2],
2427                    FullKey::for_test(
2428                        TableId::new(0),
2429                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2430                        0,
2431                    )
2432                    .encode()
2433                    .into(),
2434                    FullKey::for_test(
2435                        TableId::new(2),
2436                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2437                        0,
2438                    )
2439                    .encode()
2440                    .into(),
2441                ),
2442                gen_sst_info(
2443                    22,
2444                    vec![2],
2445                    FullKey::for_test(
2446                        TableId::new(0),
2447                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2448                        0,
2449                    )
2450                    .encode()
2451                    .into(),
2452                    FullKey::for_test(
2453                        TableId::new(2),
2454                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2455                        0,
2456                    )
2457                    .encode()
2458                    .into(),
2459                ),
2460                gen_sst_info(
2461                    23,
2462                    vec![2],
2463                    FullKey::for_test(
2464                        TableId::new(0),
2465                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2466                        0,
2467                    )
2468                    .encode()
2469                    .into(),
2470                    FullKey::for_test(
2471                        TableId::new(2),
2472                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2473                        0,
2474                    )
2475                    .encode()
2476                    .into(),
2477                ),
2478                gen_sst_info(
2479                    24,
2480                    vec![2],
2481                    FullKey::for_test(
2482                        TableId::new(2),
2483                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2484                        0,
2485                    )
2486                    .encode()
2487                    .into(),
2488                    FullKey::for_test(
2489                        TableId::new(2),
2490                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2491                        0,
2492                    )
2493                    .encode()
2494                    .into(),
2495                ),
2496                gen_sst_info(
2497                    25,
2498                    vec![2],
2499                    FullKey::for_test(
2500                        TableId::new(0),
2501                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2502                        0,
2503                    )
2504                    .encode()
2505                    .into(),
2506                    FullKey::for_test(
2507                        TableId::new(0),
2508                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2509                        0,
2510                    )
2511                    .encode()
2512                    .into(),
2513                ),
2514            ],
2515            sub_level_id: 101,
2516            level_type: LevelType::Overlapping,
2517            total_file_size: 300,
2518            ..Default::default()
2519        });
2520
2521        {
2522            // split Overlapping level
2523            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2524
2525            let mut new_sst_id = 100.into();
2526            let x = group_split::split_sst_info_for_level_v2(
2527                &mut cg1.l0.sub_levels[0],
2528                &mut new_sst_id,
2529                split_key,
2530            );
2531            // assert_eq!(3, x.len());
2532            // assert_eq!(100, x[0].sst_id);
2533            // assert_eq!(100, x[0].sst_size);
2534            // assert_eq!(101, x[1].sst_id);
2535            // assert_eq!(100, x[1].sst_size);
2536            // assert_eq!(102, x[2].sst_id);
2537            // assert_eq!(100, x[2].sst_size);
2538
2539            let mut right_l0 = OverlappingLevel {
2540                sub_levels: vec![],
2541                total_file_size: 0,
2542                uncompressed_file_size: 0,
2543            };
2544
2545            right_l0.sub_levels.push(Level {
2546                level_idx: 0,
2547                table_infos: x,
2548                sub_level_id: 101,
2549                total_file_size: 100,
2550                level_type: LevelType::Overlapping,
2551                ..Default::default()
2552            });
2553
2554            let right_levels = Levels {
2555                levels: vec![],
2556                l0: right_l0,
2557                ..Default::default()
2558            };
2559
2560            merge_levels(cg1, right_levels);
2561        }
2562
2563        {
2564            // test split empty level
2565            let mut new_sst_id = 100.into();
2566            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2567            let x = group_split::split_sst_info_for_level_v2(
2568                &mut cg1.levels[2],
2569                &mut new_sst_id,
2570                split_key,
2571            );
2572
2573            assert!(x.is_empty());
2574        }
2575
2576        {
2577            // test split to right Nonoverlapping level
2578            let mut cg1 = cg1.clone();
2579            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2580
2581            let mut new_sst_id = 100.into();
2582            let x = group_split::split_sst_info_for_level_v2(
2583                &mut cg1.levels[0],
2584                &mut new_sst_id,
2585                split_key,
2586            );
2587
2588            assert_eq!(3, x.len());
2589            assert_eq!(1, x[0].sst_id);
2590            assert_eq!(100, x[0].sst_size);
2591            assert_eq!(10, x[1].sst_id);
2592            assert_eq!(100, x[1].sst_size);
2593            assert_eq!(11, x[2].sst_id);
2594            assert_eq!(100, x[2].sst_size);
2595
2596            assert_eq!(0, cg1.levels[0].table_infos.len());
2597        }
2598
2599        {
2600            // test split to left Nonoverlapping level
2601            let mut cg1 = cg1.clone();
2602            let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2603
2604            let mut new_sst_id = 100.into();
2605            let x = group_split::split_sst_info_for_level_v2(
2606                &mut cg1.levels[0],
2607                &mut new_sst_id,
2608                split_key,
2609            );
2610
2611            assert_eq!(0, x.len());
2612            assert_eq!(3, cg1.levels[0].table_infos.len());
2613        }
2614
2615        // {
2616        //     // test split to both Nonoverlapping level
2617        //     let mut cg1 = cg1.clone();
2618        //     let split_key = build_split_key(3, VirtualNode::MAX);
2619
2620        //     let mut new_sst_id = 100;
2621        //     let x = group_split::split_sst_info_for_level_v2(
2622        //         &mut cg1.levels[0],
2623        //         &mut new_sst_id,
2624        //         split_key,
2625        //     );
2626
2627        //     assert_eq!(2, x.len());
2628        //     assert_eq!(100, x[0].sst_id);
2629        //     assert_eq!(100 / 2, x[0].sst_size);
2630        //     assert_eq!(11, x[1].sst_id);
2631        //     assert_eq!(100, x[1].sst_size);
2632        //     assert_eq!(vec![3, 4], x[0].table_ids);
2633
2634        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2635        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2636        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2637        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2638        // }
2639
2640        {
2641            // test split to both Nonoverlapping level
2642            let mut cg1 = cg1.clone();
2643            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2644
2645            let mut new_sst_id = 100.into();
2646            let x = group_split::split_sst_info_for_level_v2(
2647                &mut cg1.levels[0],
2648                &mut new_sst_id,
2649                split_key,
2650            );
2651
2652            assert_eq!(2, x.len());
2653            assert_eq!(100, x[0].sst_id);
2654            assert_eq!(100 / 2, x[0].sst_size);
2655            assert_eq!(11, x[1].sst_id);
2656            assert_eq!(100, x[1].sst_size);
2657            assert_eq!(vec![4], x[1].table_ids);
2658
2659            assert_eq!(2, cg1.levels[0].table_infos.len());
2660            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2661            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2662            assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2663        }
2664    }
2665}