risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::version::{
41    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
42    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
43};
44use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId, can_concat};
45
46#[derive(Debug, Clone, Default)]
47pub struct SstDeltaInfo {
48    pub insert_sst_level: u32,
49    pub insert_sst_infos: Vec<SstableInfo>,
50    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
51}
52
53pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
54
55impl<L> HummockVersionCommon<SstableInfo, L> {
56    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
57        self.levels
58            .get(&compaction_group_id)
59            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
60    }
61
62    pub fn get_compaction_group_levels_mut(
63        &mut self,
64        compaction_group_id: CompactionGroupId,
65    ) -> &mut Levels {
66        self.levels
67            .get_mut(&compaction_group_id)
68            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
69    }
70
71    // only scan the sst infos from levels in the specified compaction group (without table change log)
72    pub fn get_sst_ids_by_group_id(
73        &self,
74        compaction_group_id: CompactionGroupId,
75    ) -> impl Iterator<Item = u64> + '_ {
76        self.levels
77            .iter()
78            .filter_map(move |(cg_id, level)| {
79                if *cg_id == compaction_group_id {
80                    Some(level)
81                } else {
82                    None
83                }
84            })
85            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
86            .flat_map(|level| level.table_infos.iter())
87            .map(|s| s.sst_id)
88    }
89
90    pub fn level_iter<F: FnMut(&Level) -> bool>(
91        &self,
92        compaction_group_id: CompactionGroupId,
93        mut f: F,
94    ) {
95        if let Some(levels) = self.levels.get(&compaction_group_id) {
96            for sub_level in &levels.l0.sub_levels {
97                if !f(sub_level) {
98                    return;
99                }
100            }
101            for level in &levels.levels {
102                if !f(level) {
103                    return;
104                }
105            }
106        }
107    }
108
109    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
110        // l0 is currently separated from all levels
111        self.levels
112            .get(&compaction_group_id)
113            .map(|group| group.levels.len() + 1)
114            .unwrap_or(0)
115    }
116
117    pub fn safe_epoch_table_watermarks(
118        &self,
119        existing_table_ids: &[u32],
120    ) -> BTreeMap<u32, TableWatermarks> {
121        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
122    }
123}
124
125pub fn safe_epoch_table_watermarks_impl(
126    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
127    existing_table_ids: &[u32],
128) -> BTreeMap<u32, TableWatermarks> {
129    fn extract_single_table_watermark(
130        table_watermarks: &TableWatermarks,
131    ) -> Option<TableWatermarks> {
132        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
133            Some(TableWatermarks {
134                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
135                direction: table_watermarks.direction,
136                watermark_type: table_watermarks.watermark_type,
137            })
138        } else {
139            None
140        }
141    }
142    table_watermarks
143        .iter()
144        .filter_map(|(table_id, table_watermarks)| {
145            let u32_table_id = table_id.table_id();
146            if !existing_table_ids.contains(&u32_table_id) {
147                None
148            } else {
149                extract_single_table_watermark(table_watermarks)
150                    .map(|table_watermarks| (table_id.table_id, table_watermarks))
151            }
152        })
153        .collect()
154}
155
156pub fn safe_epoch_read_table_watermarks_impl(
157    safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
158) -> BTreeMap<TableId, ReadTableWatermark> {
159    safe_epoch_watermarks
160        .into_iter()
161        .map(|(table_id, watermarks)| {
162            assert_eq!(watermarks.watermarks.len(), 1);
163            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
164            let mut vnode_watermark_map = BTreeMap::new();
165            for vnode_watermark in vnode_watermarks.iter() {
166                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
167                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
168                    assert!(
169                        vnode_watermark_map
170                            .insert(vnode, watermark.clone())
171                            .is_none(),
172                        "duplicate table watermark on vnode {}",
173                        vnode.to_index()
174                    );
175                }
176            }
177            (
178                TableId::from(table_id),
179                ReadTableWatermark {
180                    direction: watermarks.direction,
181                    vnode_watermarks: vnode_watermark_map,
182                },
183            )
184        })
185        .collect()
186}
187
188impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
189    pub fn count_new_ssts_in_group_split(
190        &self,
191        parent_group_id: CompactionGroupId,
192        split_key: Bytes,
193    ) -> u64 {
194        self.levels
195            .get(&parent_group_id)
196            .map_or(0, |parent_levels| {
197                let l0 = &parent_levels.l0;
198                let mut split_count = 0;
199                for sub_level in &l0.sub_levels {
200                    assert!(!sub_level.table_infos.is_empty());
201
202                    if sub_level.level_type == PbLevelType::Overlapping {
203                        // TODO: use table_id / vnode / key_range filter
204                        split_count += sub_level
205                            .table_infos
206                            .iter()
207                            .map(|sst| {
208                                if let group_split::SstSplitType::Both =
209                                    group_split::need_to_split(sst, split_key.clone())
210                                {
211                                    2
212                                } else {
213                                    0
214                                }
215                            })
216                            .sum::<u64>();
217                        continue;
218                    }
219
220                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
221                    let sst = sub_level.table_infos.get(pos).unwrap();
222
223                    if let group_split::SstSplitType::Both =
224                        group_split::need_to_split(sst, split_key.clone())
225                    {
226                        split_count += 2;
227                    }
228                }
229
230                for level in &parent_levels.levels {
231                    if level.table_infos.is_empty() {
232                        continue;
233                    }
234                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
235                    let sst = level.table_infos.get(pos).unwrap();
236                    if let group_split::SstSplitType::Both =
237                        group_split::need_to_split(sst, split_key.clone())
238                    {
239                        split_count += 2;
240                    }
241                }
242
243                split_count
244            })
245    }
246
247    pub fn init_with_parent_group(
248        &mut self,
249        parent_group_id: CompactionGroupId,
250        group_id: CompactionGroupId,
251        member_table_ids: BTreeSet<StateTableId>,
252        new_sst_start_id: u64,
253    ) {
254        let mut new_sst_id = new_sst_start_id;
255        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
256            if new_sst_start_id != 0 {
257                if cfg!(debug_assertions) {
258                    panic!(
259                        "non-zero sst start id {} for NewCompactionGroup",
260                        new_sst_start_id
261                    );
262                } else {
263                    warn!(
264                        new_sst_start_id,
265                        "non-zero sst start id for NewCompactionGroup"
266                    );
267                }
268            }
269            return;
270        } else if !self.levels.contains_key(&parent_group_id) {
271            unreachable!(
272                "non-existing parent group id {} to init from",
273                parent_group_id
274            );
275        }
276        let [parent_levels, cur_levels] = self
277            .levels
278            .get_disjoint_mut([&parent_group_id, &group_id])
279            .map(|res| res.unwrap());
280        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
281        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
282        parent_levels.compaction_group_version_id += 1;
283        cur_levels.compaction_group_version_id += 1;
284        let l0 = &mut parent_levels.l0;
285        {
286            for sub_level in &mut l0.sub_levels {
287                let target_l0 = &mut cur_levels.l0;
288                // Remove SST from sub level may result in empty sub level. It will be purged
289                // whenever another compaction task is finished.
290                let insert_table_infos =
291                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
292                sub_level
293                    .table_infos
294                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
295                    .for_each(|sst_info| {
296                        sub_level.total_file_size -= sst_info.sst_size;
297                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
298                        l0.total_file_size -= sst_info.sst_size;
299                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
300                    });
301                if insert_table_infos.is_empty() {
302                    continue;
303                }
304                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
305                    Ok(idx) => {
306                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
307                    }
308                    Err(idx) => {
309                        insert_new_sub_level(
310                            target_l0,
311                            sub_level.sub_level_id,
312                            sub_level.level_type,
313                            insert_table_infos,
314                            Some(idx),
315                        );
316                    }
317                }
318            }
319
320            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
321        }
322        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
323            let insert_table_infos =
324                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
325            cur_levels.levels[idx].total_file_size += insert_table_infos
326                .iter()
327                .map(|sst| sst.sst_size)
328                .sum::<u64>();
329            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
330                .iter()
331                .map(|sst| sst.uncompressed_file_size)
332                .sum::<u64>();
333            cur_levels.levels[idx]
334                .table_infos
335                .extend(insert_table_infos);
336            cur_levels.levels[idx]
337                .table_infos
338                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
339            assert!(can_concat(&cur_levels.levels[idx].table_infos));
340            level
341                .table_infos
342                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
343                .for_each(|sst_info| {
344                    level.total_file_size -= sst_info.sst_size;
345                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
346                });
347        }
348
349        assert!(
350            parent_levels
351                .l0
352                .sub_levels
353                .iter()
354                .all(|level| !level.table_infos.is_empty())
355        );
356        assert!(
357            cur_levels
358                .l0
359                .sub_levels
360                .iter()
361                .all(|level| !level.table_infos.is_empty())
362        );
363    }
364
365    pub fn build_sst_delta_infos(
366        &self,
367        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
368    ) -> Vec<SstDeltaInfo> {
369        let mut infos = vec![];
370
371        // Skip trivial move delta for refiller
372        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
373        if version_delta.trivial_move {
374            return infos;
375        }
376
377        for (group_id, group_deltas) in &version_delta.group_deltas {
378            let mut info = SstDeltaInfo::default();
379
380            let mut removed_l0_ssts: BTreeSet<u64> = BTreeSet::new();
381            let mut removed_ssts: BTreeMap<u32, BTreeSet<u64>> = BTreeMap::new();
382
383            // Build only if all deltas are intra level deltas.
384            if !group_deltas.group_deltas.iter().all(|delta| {
385                matches!(
386                    delta,
387                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
388                )
389            }) {
390                continue;
391            }
392
393            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
394            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
395            // struct to reduce conventions.
396            for group_delta in &group_deltas.group_deltas {
397                match group_delta {
398                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
399                        if !inserted_table_infos.is_empty() {
400                            info.insert_sst_level = 0;
401                            info.insert_sst_infos
402                                .extend(inserted_table_infos.iter().cloned());
403                        }
404                    }
405                    GroupDeltaCommon::IntraLevel(intra_level) => {
406                        if !intra_level.inserted_table_infos.is_empty() {
407                            info.insert_sst_level = intra_level.level_idx;
408                            info.insert_sst_infos
409                                .extend(intra_level.inserted_table_infos.iter().cloned());
410                        }
411                        if !intra_level.removed_table_ids.is_empty() {
412                            for id in &intra_level.removed_table_ids {
413                                if intra_level.level_idx == 0 {
414                                    removed_l0_ssts.insert(*id);
415                                } else {
416                                    removed_ssts
417                                        .entry(intra_level.level_idx)
418                                        .or_default()
419                                        .insert(*id);
420                                }
421                            }
422                        }
423                    }
424                    GroupDeltaCommon::GroupConstruct(_)
425                    | GroupDeltaCommon::GroupDestroy(_)
426                    | GroupDeltaCommon::GroupMerge(_) => {}
427                }
428            }
429
430            let group = self.levels.get(group_id).unwrap();
431            for l0_sub_level in &group.level0().sub_levels {
432                for sst_info in &l0_sub_level.table_infos {
433                    if removed_l0_ssts.remove(&sst_info.sst_id) {
434                        info.delete_sst_object_ids.push(sst_info.object_id);
435                    }
436                }
437            }
438            for level in &group.levels {
439                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
440                    for sst_info in &level.table_infos {
441                        if removed_level_ssts.remove(&sst_info.sst_id) {
442                            info.delete_sst_object_ids.push(sst_info.object_id);
443                        }
444                    }
445                    if !removed_level_ssts.is_empty() {
446                        tracing::error!(
447                            "removed_level_ssts is not empty: {:?}",
448                            removed_level_ssts,
449                        );
450                    }
451                    debug_assert!(removed_level_ssts.is_empty());
452                }
453            }
454
455            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
456                tracing::error!(
457                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
458                    removed_l0_ssts,
459                    removed_ssts
460                );
461            }
462            debug_assert!(removed_l0_ssts.is_empty());
463            debug_assert!(removed_ssts.is_empty());
464
465            infos.push(info);
466        }
467
468        infos
469    }
470
471    pub fn apply_version_delta(
472        &mut self,
473        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
474    ) {
475        assert_eq!(self.id, version_delta.prev_id);
476
477        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
478            &version_delta.state_table_info_delta,
479            &version_delta.removed_table_ids,
480        );
481
482        #[expect(deprecated)]
483        {
484            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
485                is_commit_epoch = true;
486                tracing::trace!(
487                    "max committed epoch bumped but no table committed epoch is changed"
488                );
489            }
490        }
491
492        // apply to `levels`, which is different compaction groups
493        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
494            let mut is_applied_l0_compact = false;
495            for group_delta in &group_deltas.group_deltas {
496                match group_delta {
497                    GroupDeltaCommon::GroupConstruct(group_construct) => {
498                        let mut new_levels = build_initial_compaction_group_levels(
499                            *compaction_group_id,
500                            group_construct.get_group_config().unwrap(),
501                        );
502                        let parent_group_id = group_construct.parent_group_id;
503                        new_levels.parent_group_id = parent_group_id;
504                        #[expect(deprecated)]
505                        // for backward-compatibility of previous hummock version delta
506                        new_levels
507                            .member_table_ids
508                            .clone_from(&group_construct.table_ids);
509                        self.levels.insert(*compaction_group_id, new_levels);
510                        let member_table_ids = if group_construct.version()
511                            >= CompatibilityVersion::NoMemberTableIds
512                        {
513                            self.state_table_info
514                                .compaction_group_member_table_ids(*compaction_group_id)
515                                .iter()
516                                .map(|table_id| table_id.table_id)
517                                .collect()
518                        } else {
519                            #[expect(deprecated)]
520                            // for backward-compatibility of previous hummock version delta
521                            BTreeSet::from_iter(group_construct.table_ids.clone())
522                        };
523
524                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
525                            let split_key = if group_construct.split_key.is_some() {
526                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
527                            } else {
528                                None
529                            };
530                            self.init_with_parent_group_v2(
531                                parent_group_id,
532                                *compaction_group_id,
533                                group_construct.get_new_sst_start_id(),
534                                split_key.clone(),
535                            );
536                        } else {
537                            // for backward-compatibility of previous hummock version delta
538                            self.init_with_parent_group(
539                                parent_group_id,
540                                *compaction_group_id,
541                                member_table_ids,
542                                group_construct.get_new_sst_start_id(),
543                            );
544                        }
545                    }
546                    GroupDeltaCommon::GroupMerge(group_merge) => {
547                        tracing::info!(
548                            "group_merge left {:?} right {:?}",
549                            group_merge.left_group_id,
550                            group_merge.right_group_id
551                        );
552                        self.merge_compaction_group(
553                            group_merge.left_group_id,
554                            group_merge.right_group_id,
555                        )
556                    }
557                    GroupDeltaCommon::IntraLevel(level_delta) => {
558                        let levels =
559                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
560                                panic!("compaction group {} does not exist", compaction_group_id)
561                            });
562                        if is_commit_epoch {
563                            assert!(
564                                level_delta.removed_table_ids.is_empty(),
565                                "no sst should be deleted when committing an epoch"
566                            );
567
568                            let IntraLevelDelta {
569                                level_idx,
570                                l0_sub_level_id,
571                                inserted_table_infos,
572                                ..
573                            } = level_delta;
574                            {
575                                assert_eq!(
576                                    *level_idx, 0,
577                                    "we should only add to L0 when we commit an epoch."
578                                );
579                                if !inserted_table_infos.is_empty() {
580                                    insert_new_sub_level(
581                                        &mut levels.l0,
582                                        *l0_sub_level_id,
583                                        PbLevelType::Overlapping,
584                                        inserted_table_infos.clone(),
585                                        None,
586                                    );
587                                }
588                            }
589                        } else {
590                            // The delta is caused by compaction.
591                            levels.apply_compact_ssts(
592                                level_delta,
593                                self.state_table_info
594                                    .compaction_group_member_table_ids(*compaction_group_id),
595                            );
596                            if level_delta.level_idx == 0 {
597                                is_applied_l0_compact = true;
598                            }
599                        }
600                    }
601                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
602                        let levels =
603                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
604                                panic!("compaction group {} does not exist", compaction_group_id)
605                            });
606                        assert!(is_commit_epoch);
607
608                        if !inserted_table_infos.is_empty() {
609                            let next_l0_sub_level_id = levels
610                                .l0
611                                .sub_levels
612                                .last()
613                                .map(|level| level.sub_level_id + 1)
614                                .unwrap_or(1);
615
616                            insert_new_sub_level(
617                                &mut levels.l0,
618                                next_l0_sub_level_id,
619                                PbLevelType::Overlapping,
620                                inserted_table_infos.clone(),
621                                None,
622                            );
623                        }
624                    }
625                    GroupDeltaCommon::GroupDestroy(_) => {
626                        self.levels.remove(compaction_group_id);
627                    }
628                }
629            }
630            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
631            {
632                levels.post_apply_l0_compact();
633            }
634        }
635        self.id = version_delta.id;
636        #[expect(deprecated)]
637        {
638            self.max_committed_epoch = version_delta.max_committed_epoch;
639        }
640
641        // apply to table watermark
642
643        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
644        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
645            HashMap::new();
646
647        // apply to table watermark
648        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
649            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
650                if version_delta.removed_table_ids.contains(table_id) {
651                    modified_table_watermarks.insert(*table_id, None);
652                } else {
653                    let mut current_table_watermarks = (**current_table_watermarks).clone();
654                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
655                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
656                }
657            } else {
658                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
659            }
660        }
661        for (table_id, table_watermarks) in &self.table_watermarks {
662            let safe_epoch = if let Some(state_table_info) =
663                self.state_table_info.info().get(table_id)
664                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
665                && state_table_info.committed_epoch > *oldest_epoch
666            {
667                // safe epoch has progressed, need further clear.
668                state_table_info.committed_epoch
669            } else {
670                // safe epoch not progressed or the table has been removed. No need to truncate
671                continue;
672            };
673            let table_watermarks = modified_table_watermarks
674                .entry(*table_id)
675                .or_insert_with(|| Some((**table_watermarks).clone()));
676            if let Some(table_watermarks) = table_watermarks {
677                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
678            }
679        }
680        // apply the staging table watermark to hummock version
681        for (table_id, table_watermarks) in modified_table_watermarks {
682            if let Some(table_watermarks) = table_watermarks {
683                self.table_watermarks
684                    .insert(table_id, Arc::new(table_watermarks));
685            } else {
686                self.table_watermarks.remove(&table_id);
687            }
688        }
689
690        // apply to table change log
691        Self::apply_change_log_delta(
692            &mut self.table_change_log,
693            &version_delta.change_log_delta,
694            &version_delta.removed_table_ids,
695            &version_delta.state_table_info_delta,
696            &changed_table_info,
697        );
698    }
699
700    pub fn apply_change_log_delta<T: Clone>(
701        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
702        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
703        removed_table_ids: &HashSet<TableId>,
704        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
705        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
706    ) {
707        for (table_id, change_log_delta) in change_log_delta {
708            let new_change_log = &change_log_delta.new_log;
709            match table_change_log.entry(*table_id) {
710                Entry::Occupied(entry) => {
711                    let change_log = entry.into_mut();
712                    change_log.add_change_log(new_change_log.clone());
713                }
714                Entry::Vacant(entry) => {
715                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
716                }
717            };
718        }
719
720        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
721        // the change log for the table, and then we will remove the table change log.
722        // The table change log will also be removed when the table id is removed.
723        table_change_log.retain(|table_id, _| {
724            if removed_table_ids.contains(table_id) {
725                return false;
726            }
727            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
728                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
729                // the table exists previously, and its committed epoch has progressed.
730            } else {
731                // otherwise, the table change log should be kept anyway
732                return true;
733            }
734            let contains = change_log_delta.contains_key(table_id);
735            if !contains {
736                warn!(
737                        ?table_id,
738                        "table change log dropped due to no further change log at newly committed epoch",
739                    );
740            }
741            contains
742        });
743
744        // truncate the remaining table change log
745        for (table_id, change_log_delta) in change_log_delta {
746            if let Some(change_log) = table_change_log.get_mut(table_id) {
747                change_log.truncate(change_log_delta.truncate_epoch);
748            }
749        }
750    }
751
752    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
753        let mut ret: BTreeMap<_, _> = BTreeMap::new();
754        for (compaction_group_id, group) in &self.levels {
755            let mut levels = vec![];
756            levels.extend(group.l0.sub_levels.iter());
757            levels.extend(group.levels.iter());
758            for level in levels {
759                for table_info in &level.table_infos {
760                    if table_info.sst_id == table_info.object_id {
761                        continue;
762                    }
763                    let object_id = table_info.object_id;
764                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
765                    entry
766                        .entry(*compaction_group_id)
767                        .or_default()
768                        .push(table_info.sst_id)
769                }
770            }
771        }
772        ret
773    }
774
775    pub fn merge_compaction_group(
776        &mut self,
777        left_group_id: CompactionGroupId,
778        right_group_id: CompactionGroupId,
779    ) {
780        // Double check
781        let left_group_id_table_ids = self
782            .state_table_info
783            .compaction_group_member_table_ids(left_group_id)
784            .iter()
785            .map(|table_id| table_id.table_id);
786        let right_group_id_table_ids = self
787            .state_table_info
788            .compaction_group_member_table_ids(right_group_id)
789            .iter()
790            .map(|table_id| table_id.table_id);
791
792        assert!(
793            left_group_id_table_ids
794                .chain(right_group_id_table_ids)
795                .is_sorted()
796        );
797
798        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
799        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
800            panic!(
801                "compaction group should exist right {} all {:?}",
802                right_group_id, total_cg
803            )
804        });
805
806        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
807            panic!(
808                "compaction group should exist left {} all {:?}",
809                left_group_id, total_cg
810            )
811        });
812
813        group_split::merge_levels(left_levels, right_levels);
814    }
815
816    pub fn init_with_parent_group_v2(
817        &mut self,
818        parent_group_id: CompactionGroupId,
819        group_id: CompactionGroupId,
820        new_sst_start_id: u64,
821        split_key: Option<Bytes>,
822    ) {
823        let mut new_sst_id = new_sst_start_id;
824        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
825            if new_sst_start_id != 0 {
826                if cfg!(debug_assertions) {
827                    panic!(
828                        "non-zero sst start id {} for NewCompactionGroup",
829                        new_sst_start_id
830                    );
831                } else {
832                    warn!(
833                        new_sst_start_id,
834                        "non-zero sst start id for NewCompactionGroup"
835                    );
836                }
837            }
838            return;
839        } else if !self.levels.contains_key(&parent_group_id) {
840            unreachable!(
841                "non-existing parent group id {} to init from (V2)",
842                parent_group_id
843            );
844        }
845
846        let [parent_levels, cur_levels] = self
847            .levels
848            .get_disjoint_mut([&parent_group_id, &group_id])
849            .map(|res| res.unwrap());
850        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
851        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
852        parent_levels.compaction_group_version_id += 1;
853        cur_levels.compaction_group_version_id += 1;
854
855        let l0 = &mut parent_levels.l0;
856        {
857            for sub_level in &mut l0.sub_levels {
858                let target_l0 = &mut cur_levels.l0;
859                // Remove SST from sub level may result in empty sub level. It will be purged
860                // whenever another compaction task is finished.
861                let insert_table_infos = if let Some(split_key) = &split_key {
862                    group_split::split_sst_info_for_level_v2(
863                        sub_level,
864                        &mut new_sst_id,
865                        split_key.clone(),
866                    )
867                } else {
868                    vec![]
869                };
870
871                if insert_table_infos.is_empty() {
872                    continue;
873                }
874
875                sub_level
876                    .table_infos
877                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
878                    .for_each(|sst_info| {
879                        sub_level.total_file_size -= sst_info.sst_size;
880                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
881                        l0.total_file_size -= sst_info.sst_size;
882                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
883                    });
884                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
885                    Ok(idx) => {
886                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
887                    }
888                    Err(idx) => {
889                        insert_new_sub_level(
890                            target_l0,
891                            sub_level.sub_level_id,
892                            sub_level.level_type,
893                            insert_table_infos,
894                            Some(idx),
895                        );
896                    }
897                }
898            }
899
900            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
901        }
902
903        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
904            let insert_table_infos = if let Some(split_key) = &split_key {
905                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
906            } else {
907                vec![]
908            };
909
910            if insert_table_infos.is_empty() {
911                continue;
912            }
913
914            cur_levels.levels[idx].total_file_size += insert_table_infos
915                .iter()
916                .map(|sst| sst.sst_size)
917                .sum::<u64>();
918            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
919                .iter()
920                .map(|sst| sst.uncompressed_file_size)
921                .sum::<u64>();
922            cur_levels.levels[idx]
923                .table_infos
924                .extend(insert_table_infos);
925            cur_levels.levels[idx]
926                .table_infos
927                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
928            assert!(can_concat(&cur_levels.levels[idx].table_infos));
929            level
930                .table_infos
931                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
932                .for_each(|sst_info| {
933                    level.total_file_size -= sst_info.sst_size;
934                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
935                });
936        }
937
938        assert!(
939            parent_levels
940                .l0
941                .sub_levels
942                .iter()
943                .all(|level| !level.table_infos.is_empty())
944        );
945        assert!(
946            cur_levels
947                .l0
948                .sub_levels
949                .iter()
950                .all(|level| !level.table_infos.is_empty())
951        );
952    }
953}
954
955impl<T> HummockVersionCommon<T>
956where
957    T: SstableIdReader + ObjectIdReader,
958{
959    pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
960        self.get_sst_infos(exclude_change_log)
961            .map(|s| s.object_id())
962            .collect()
963    }
964
965    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
966        self.get_sst_infos(exclude_change_log)
967            .map(|s| s.sst_id())
968            .collect()
969    }
970
971    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
972        let may_table_change_log = if exclude_change_log {
973            None
974        } else {
975            Some(self.table_change_log.values())
976        };
977        self.get_combined_levels()
978            .flat_map(|level| level.table_infos.iter())
979            .chain(
980                may_table_change_log
981                    .map(|v| {
982                        v.flat_map(|table_change_log| {
983                            table_change_log.iter().flat_map(|epoch_change_log| {
984                                epoch_change_log
985                                    .old_value
986                                    .iter()
987                                    .chain(epoch_change_log.new_value.iter())
988                            })
989                        })
990                    })
991                    .into_iter()
992                    .flatten(),
993            )
994    }
995}
996
997impl Levels {
998    pub(crate) fn apply_compact_ssts(
999        &mut self,
1000        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1001        member_table_ids: &BTreeSet<TableId>,
1002    ) {
1003        let IntraLevelDeltaCommon {
1004            level_idx,
1005            l0_sub_level_id,
1006            inserted_table_infos: insert_table_infos,
1007            vnode_partition_count,
1008            removed_table_ids: delete_sst_ids_set,
1009            compaction_group_version_id,
1010        } = level_delta;
1011        let new_vnode_partition_count = *vnode_partition_count;
1012
1013        if is_compaction_task_expired(
1014            self.compaction_group_version_id,
1015            *compaction_group_version_id,
1016        ) {
1017            warn!(
1018                current_compaction_group_version_id = self.compaction_group_version_id,
1019                delta_compaction_group_version_id = compaction_group_version_id,
1020                level_idx,
1021                l0_sub_level_id,
1022                insert_table_infos = ?insert_table_infos
1023                    .iter()
1024                    .map(|sst| (sst.sst_id, sst.object_id))
1025                    .collect_vec(),
1026                ?delete_sst_ids_set,
1027                "This VersionDelta may be committed by an expired compact task. Please check it."
1028            );
1029            return;
1030        }
1031        if !delete_sst_ids_set.is_empty() {
1032            if *level_idx == 0 {
1033                for level in &mut self.l0.sub_levels {
1034                    level_delete_ssts(level, delete_sst_ids_set);
1035                }
1036            } else {
1037                let idx = *level_idx as usize - 1;
1038                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1039            }
1040        }
1041
1042        if !insert_table_infos.is_empty() {
1043            let insert_sst_level_id = *level_idx;
1044            let insert_sub_level_id = *l0_sub_level_id;
1045            if insert_sst_level_id == 0 {
1046                let l0 = &mut self.l0;
1047                let index = l0
1048                    .sub_levels
1049                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1050                assert!(
1051                    index < l0.sub_levels.len()
1052                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1053                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1054                    insert_sub_level_id,
1055                    delete_sst_ids_set,
1056                    l0.sub_levels
1057                        .iter()
1058                        .map(|level| level.sub_level_id)
1059                        .collect_vec()
1060                );
1061                if l0.sub_levels[index].table_infos.is_empty()
1062                    && member_table_ids.len() == 1
1063                    && insert_table_infos.iter().all(|sst| {
1064                        sst.table_ids.len() == 1
1065                            && sst.table_ids[0]
1066                                == member_table_ids.iter().next().expect("non-empty").table_id
1067                    })
1068                {
1069                    // Only change vnode_partition_count for group which has only one state-table.
1070                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1071                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1072                }
1073                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1074            } else {
1075                let idx = insert_sst_level_id as usize - 1;
1076                if self.levels[idx].table_infos.is_empty()
1077                    && insert_table_infos
1078                        .iter()
1079                        .all(|sst| sst.table_ids.len() == 1)
1080                {
1081                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1082                } else if self.levels[idx].vnode_partition_count != 0
1083                    && new_vnode_partition_count == 0
1084                    && member_table_ids.len() > 1
1085                {
1086                    self.levels[idx].vnode_partition_count = 0;
1087                }
1088                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1089            }
1090        }
1091    }
1092
1093    pub(crate) fn post_apply_l0_compact(&mut self) {
1094        {
1095            self.l0
1096                .sub_levels
1097                .retain(|level| !level.table_infos.is_empty());
1098            self.l0.total_file_size = self
1099                .l0
1100                .sub_levels
1101                .iter()
1102                .map(|level| level.total_file_size)
1103                .sum::<u64>();
1104            self.l0.uncompressed_file_size = self
1105                .l0
1106                .sub_levels
1107                .iter()
1108                .map(|level| level.uncompressed_file_size)
1109                .sum::<u64>();
1110        }
1111    }
1112}
1113
1114impl<T, L> HummockVersionCommon<T, L> {
1115    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1116        self.levels
1117            .values()
1118            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1119    }
1120}
1121
1122pub fn build_initial_compaction_group_levels(
1123    group_id: CompactionGroupId,
1124    compaction_config: &CompactionConfig,
1125) -> Levels {
1126    let mut levels = vec![];
1127    for l in 0..compaction_config.get_max_level() {
1128        levels.push(Level {
1129            level_idx: (l + 1) as u32,
1130            level_type: PbLevelType::Nonoverlapping,
1131            table_infos: vec![],
1132            total_file_size: 0,
1133            sub_level_id: 0,
1134            uncompressed_file_size: 0,
1135            vnode_partition_count: 0,
1136        });
1137    }
1138    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1139    Levels {
1140        levels,
1141        l0: OverlappingLevel {
1142            sub_levels: vec![],
1143            total_file_size: 0,
1144            uncompressed_file_size: 0,
1145        },
1146        group_id,
1147        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1148        member_table_ids: vec![],
1149        compaction_group_version_id: 0,
1150    }
1151}
1152
1153fn split_sst_info_for_level(
1154    member_table_ids: &BTreeSet<u32>,
1155    level: &mut Level,
1156    new_sst_id: &mut u64,
1157) -> Vec<SstableInfo> {
1158    // Remove SST from sub level may result in empty sub level. It will be purged
1159    // whenever another compaction task is finished.
1160    let mut insert_table_infos = vec![];
1161    for sst_info in &mut level.table_infos {
1162        let removed_table_ids = sst_info
1163            .table_ids
1164            .iter()
1165            .filter(|table_id| member_table_ids.contains(table_id))
1166            .cloned()
1167            .collect_vec();
1168        let sst_size = sst_info.sst_size;
1169        if sst_size / 2 == 0 {
1170            tracing::warn!(
1171                id = sst_info.sst_id,
1172                object_id = sst_info.object_id,
1173                sst_size = sst_info.sst_size,
1174                file_size = sst_info.file_size,
1175                "Sstable sst_size is under expected",
1176            );
1177        };
1178        if !removed_table_ids.is_empty() {
1179            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1180                sst_info,
1181                new_sst_id,
1182                sst_size / 2,
1183                sst_size / 2,
1184                member_table_ids.iter().cloned().collect_vec(),
1185            );
1186            *sst_info = modified_sst;
1187            insert_table_infos.push(branch_sst);
1188        }
1189    }
1190    insert_table_infos
1191}
1192
1193/// Gets all compaction group ids.
1194pub fn get_compaction_group_ids(
1195    version: &HummockVersion,
1196) -> impl Iterator<Item = CompactionGroupId> + '_ {
1197    version.levels.keys().cloned()
1198}
1199
1200pub fn get_table_compaction_group_id_mapping(
1201    version: &HummockVersion,
1202) -> HashMap<StateTableId, CompactionGroupId> {
1203    version
1204        .state_table_info
1205        .info()
1206        .iter()
1207        .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1208        .collect()
1209}
1210
1211/// Gets all SSTs in `group_id`
1212pub fn get_compaction_group_ssts(
1213    version: &HummockVersion,
1214    group_id: CompactionGroupId,
1215) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1216    let group_levels = version.get_compaction_group_levels(group_id);
1217    group_levels
1218        .l0
1219        .sub_levels
1220        .iter()
1221        .rev()
1222        .chain(group_levels.levels.iter())
1223        .flat_map(|level| {
1224            level
1225                .table_infos
1226                .iter()
1227                .map(|table_info| (table_info.object_id, table_info.sst_id))
1228        })
1229}
1230
1231pub fn new_sub_level(
1232    sub_level_id: u64,
1233    level_type: PbLevelType,
1234    table_infos: Vec<SstableInfo>,
1235) -> Level {
1236    if level_type == PbLevelType::Nonoverlapping {
1237        debug_assert!(
1238            can_concat(&table_infos),
1239            "sst of non-overlapping level is not concat-able: {:?}",
1240            table_infos
1241        );
1242    }
1243    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1244    let uncompressed_file_size = table_infos
1245        .iter()
1246        .map(|table| table.uncompressed_file_size)
1247        .sum();
1248    Level {
1249        level_idx: 0,
1250        level_type,
1251        table_infos,
1252        total_file_size,
1253        sub_level_id,
1254        uncompressed_file_size,
1255        vnode_partition_count: 0,
1256    }
1257}
1258
1259pub fn add_ssts_to_sub_level(
1260    l0: &mut OverlappingLevel,
1261    sub_level_idx: usize,
1262    insert_table_infos: Vec<SstableInfo>,
1263) {
1264    insert_table_infos.iter().for_each(|sst| {
1265        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1266        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1267        l0.total_file_size += sst.sst_size;
1268        l0.uncompressed_file_size += sst.uncompressed_file_size;
1269    });
1270    l0.sub_levels[sub_level_idx]
1271        .table_infos
1272        .extend(insert_table_infos);
1273    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1274        l0.sub_levels[sub_level_idx]
1275            .table_infos
1276            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1277        assert!(
1278            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1279            "sstable ids: {:?}",
1280            l0.sub_levels[sub_level_idx]
1281                .table_infos
1282                .iter()
1283                .map(|sst| sst.sst_id)
1284                .collect_vec()
1285        );
1286    }
1287}
1288
1289/// `None` value of `sub_level_insert_hint` means append.
1290pub fn insert_new_sub_level(
1291    l0: &mut OverlappingLevel,
1292    insert_sub_level_id: u64,
1293    level_type: PbLevelType,
1294    insert_table_infos: Vec<SstableInfo>,
1295    sub_level_insert_hint: Option<usize>,
1296) {
1297    if insert_sub_level_id == u64::MAX {
1298        return;
1299    }
1300    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1301        insert_pos
1302    } else {
1303        if let Some(newest_level) = l0.sub_levels.last() {
1304            assert!(
1305                newest_level.sub_level_id < insert_sub_level_id,
1306                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1307                newest_level.sub_level_id,
1308                insert_sub_level_id,
1309                l0,
1310            );
1311        }
1312        l0.sub_levels.len()
1313    };
1314    #[cfg(debug_assertions)]
1315    {
1316        if insert_pos > 0 {
1317            if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1318                debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1319            }
1320        }
1321        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1322            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1323        }
1324    }
1325    // All files will be committed in one new Overlapping sub-level and become
1326    // Nonoverlapping  after at least one compaction.
1327    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1328    l0.total_file_size += level.total_file_size;
1329    l0.uncompressed_file_size += level.uncompressed_file_size;
1330    l0.sub_levels.insert(insert_pos, level);
1331}
1332
1333/// Delete sstables if the table id is in the id set.
1334///
1335/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1336fn level_delete_ssts(
1337    operand: &mut Level,
1338    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1339) -> bool {
1340    let original_len = operand.table_infos.len();
1341    operand
1342        .table_infos
1343        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1344    operand.total_file_size = operand
1345        .table_infos
1346        .iter()
1347        .map(|table| table.sst_size)
1348        .sum::<u64>();
1349    operand.uncompressed_file_size = operand
1350        .table_infos
1351        .iter()
1352        .map(|table| table.uncompressed_file_size)
1353        .sum::<u64>();
1354    original_len != operand.table_infos.len()
1355}
1356
1357fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1358    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1359        format!(
1360            "sstable ids: {:?}",
1361            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1362        )
1363    }
1364    operand.total_file_size += insert_table_infos
1365        .iter()
1366        .map(|sst| sst.sst_size)
1367        .sum::<u64>();
1368    operand.uncompressed_file_size += insert_table_infos
1369        .iter()
1370        .map(|sst| sst.uncompressed_file_size)
1371        .sum::<u64>();
1372    if operand.level_type == PbLevelType::Overlapping {
1373        operand.level_type = PbLevelType::Nonoverlapping;
1374        operand
1375            .table_infos
1376            .extend(insert_table_infos.iter().cloned());
1377        operand
1378            .table_infos
1379            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1380        assert!(
1381            can_concat(&operand.table_infos),
1382            "{}",
1383            display_sstable_infos(&operand.table_infos)
1384        );
1385    } else if !insert_table_infos.is_empty() {
1386        let sorted_insert: Vec<_> = insert_table_infos
1387            .iter()
1388            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1389            .cloned()
1390            .collect();
1391        let first = &sorted_insert[0];
1392        let last = &sorted_insert[sorted_insert.len() - 1];
1393        let pos = operand
1394            .table_infos
1395            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1396        if pos >= operand.table_infos.len()
1397            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1398        {
1399            operand.table_infos.splice(pos..pos, sorted_insert);
1400            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1401            let validate_range = operand
1402                .table_infos
1403                .iter()
1404                .skip(pos.saturating_sub(1))
1405                .take(insert_table_infos.len() + 2)
1406                .collect_vec();
1407            assert!(
1408                can_concat(&validate_range),
1409                "{}",
1410                display_sstable_infos(&validate_range),
1411            );
1412        } else {
1413            // If this branch is reached, it indicates some unexpected behavior in compaction.
1414            // Here we issue a warning and fall back to insert one by one.
1415            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1416            for i in insert_table_infos {
1417                let pos = operand
1418                    .table_infos
1419                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1420                operand.table_infos.insert(pos, i.clone());
1421            }
1422            assert!(
1423                can_concat(&operand.table_infos),
1424                "{}",
1425                display_sstable_infos(&operand.table_infos)
1426            );
1427        }
1428    }
1429}
1430
1431pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObjectId, u64> {
1432    version
1433        .levels
1434        .values()
1435        .flat_map(|cg| {
1436            cg.level0()
1437                .sub_levels
1438                .iter()
1439                .chain(cg.levels.iter())
1440                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1441        })
1442        .chain(version.table_change_log.values().flat_map(|c| {
1443            c.iter().flat_map(|l| {
1444                l.old_value
1445                    .iter()
1446                    .chain(l.new_value.iter())
1447                    .map(|t| (t.object_id, t.file_size))
1448            })
1449        }))
1450        .collect()
1451}
1452
1453/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1454/// Currently this method is only used by risectl validate-version.
1455pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1456    let mut res = Vec::new();
1457    // Ensure each table maps to only one compaction group
1458    for (group_id, levels) in &version.levels {
1459        // Ensure compaction group id matches
1460        if levels.group_id != *group_id {
1461            res.push(format!(
1462                "GROUP {}: inconsistent group id {} in Levels",
1463                group_id, levels.group_id
1464            ));
1465        }
1466
1467        let validate_level = |group: CompactionGroupId,
1468                              expected_level_idx: u32,
1469                              level: &Level,
1470                              res: &mut Vec<String>| {
1471            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1472            if level.level_idx == 0 {
1473                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1474                // Ensure sub-level is not empty
1475                if level.table_infos.is_empty() {
1476                    res.push(format!("{}: empty level", level_identifier));
1477                }
1478            } else if level.level_type != PbLevelType::Nonoverlapping {
1479                // Ensure non-L0 level is non-overlapping level
1480                res.push(format!(
1481                    "{}: level type {:?} is not non-overlapping",
1482                    level_identifier, level.level_type
1483                ));
1484            }
1485
1486            // Ensure level idx matches
1487            if level.level_idx != expected_level_idx {
1488                res.push(format!(
1489                    "{}: mismatched level idx {}",
1490                    level_identifier, expected_level_idx
1491                ));
1492            }
1493
1494            let mut prev_table_info: Option<&SstableInfo> = None;
1495            for table_info in &level.table_infos {
1496                // Ensure table_ids are sorted and unique
1497                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1498                    res.push(format!(
1499                        "{} SST {}: table_ids not sorted",
1500                        level_identifier, table_info.object_id
1501                    ));
1502                }
1503
1504                // Ensure SSTs in non-overlapping level have non-overlapping key range
1505                if level.level_type == PbLevelType::Nonoverlapping {
1506                    if let Some(prev) = prev_table_info.take() {
1507                        if prev
1508                            .key_range
1509                            .compare_right_with(&table_info.key_range.left)
1510                            != Ordering::Less
1511                        {
1512                            res.push(format!(
1513                                "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1514                                level_identifier, table_info.object_id, prev, table_info
1515                            ));
1516                        }
1517                    }
1518                    let _ = prev_table_info.insert(table_info);
1519                }
1520            }
1521        };
1522
1523        let l0 = &levels.l0;
1524        let mut prev_sub_level_id = u64::MAX;
1525        for sub_level in &l0.sub_levels {
1526            // Ensure sub_level_id is sorted and unique
1527            if sub_level.sub_level_id >= prev_sub_level_id {
1528                res.push(format!(
1529                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1530                    group_id, sub_level.level_idx, prev_sub_level_id
1531                ));
1532            }
1533            prev_sub_level_id = sub_level.sub_level_id;
1534
1535            validate_level(*group_id, 0, sub_level, &mut res);
1536        }
1537
1538        for idx in 1..=levels.levels.len() {
1539            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1540        }
1541    }
1542    res
1543}
1544
1545#[cfg(test)]
1546mod tests {
1547    use std::collections::{HashMap, HashSet};
1548
1549    use bytes::Bytes;
1550    use risingwave_common::catalog::TableId;
1551    use risingwave_common::hash::VirtualNode;
1552    use risingwave_common::util::epoch::test_epoch;
1553    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1554
1555    use super::group_split;
1556    use crate::HummockVersionId;
1557    use crate::compaction_group::group_split::*;
1558    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1559    use crate::key::{FullKey, gen_key_from_str};
1560    use crate::key_range::KeyRange;
1561    use crate::level::{Level, Levels, OverlappingLevel};
1562    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1563    use crate::version::{
1564        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1565    };
1566
1567    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1568        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1569    }
1570
1571    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1572        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1573        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1574        let full_key_l = FullKey::for_test(
1575            TableId::new(*table_ids.first().unwrap()),
1576            table_key_l,
1577            epoch,
1578        )
1579        .encode();
1580        let full_key_r =
1581            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1582                .encode();
1583
1584        SstableInfoInner {
1585            sst_id,
1586            key_range: KeyRange {
1587                left: full_key_l.into(),
1588                right: full_key_r.into(),
1589                right_exclusive: false,
1590            },
1591            table_ids,
1592            object_id: sst_id,
1593            min_epoch: 20,
1594            max_epoch: 20,
1595            file_size: 100,
1596            sst_size: 100,
1597            ..Default::default()
1598        }
1599    }
1600
1601    #[test]
1602    fn test_get_sst_object_ids() {
1603        let mut version = HummockVersion {
1604            id: HummockVersionId::new(0),
1605            levels: HashMap::from_iter([(
1606                0,
1607                Levels {
1608                    levels: vec![],
1609                    l0: OverlappingLevel {
1610                        sub_levels: vec![],
1611                        total_file_size: 0,
1612                        uncompressed_file_size: 0,
1613                    },
1614                    ..Default::default()
1615                },
1616            )]),
1617            ..Default::default()
1618        };
1619        assert_eq!(version.get_object_ids(false).len(), 0);
1620
1621        // Add to sub level
1622        version
1623            .levels
1624            .get_mut(&0)
1625            .unwrap()
1626            .l0
1627            .sub_levels
1628            .push(Level {
1629                table_infos: vec![
1630                    SstableInfoInner {
1631                        object_id: 11,
1632                        sst_id: 11,
1633                        ..Default::default()
1634                    }
1635                    .into(),
1636                ],
1637                ..Default::default()
1638            });
1639        assert_eq!(version.get_object_ids(false).len(), 1);
1640
1641        // Add to non sub level
1642        version.levels.get_mut(&0).unwrap().levels.push(Level {
1643            table_infos: vec![
1644                SstableInfoInner {
1645                    object_id: 22,
1646                    sst_id: 22,
1647                    ..Default::default()
1648                }
1649                .into(),
1650            ],
1651            ..Default::default()
1652        });
1653        assert_eq!(version.get_object_ids(false).len(), 2);
1654    }
1655
1656    #[test]
1657    fn test_apply_version_delta() {
1658        let mut version = HummockVersion {
1659            id: HummockVersionId::new(0),
1660            levels: HashMap::from_iter([
1661                (
1662                    0,
1663                    build_initial_compaction_group_levels(
1664                        0,
1665                        &CompactionConfig {
1666                            max_level: 6,
1667                            ..Default::default()
1668                        },
1669                    ),
1670                ),
1671                (
1672                    1,
1673                    build_initial_compaction_group_levels(
1674                        1,
1675                        &CompactionConfig {
1676                            max_level: 6,
1677                            ..Default::default()
1678                        },
1679                    ),
1680                ),
1681            ]),
1682            ..Default::default()
1683        };
1684        let version_delta = HummockVersionDelta {
1685            id: HummockVersionId::new(1),
1686            group_deltas: HashMap::from_iter([
1687                (
1688                    2,
1689                    GroupDeltas {
1690                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1691                            group_config: Some(CompactionConfig {
1692                                max_level: 6,
1693                                ..Default::default()
1694                            }),
1695                            ..Default::default()
1696                        }))],
1697                    },
1698                ),
1699                (
1700                    0,
1701                    GroupDeltas {
1702                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1703                    },
1704                ),
1705                (
1706                    1,
1707                    GroupDeltas {
1708                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1709                            1,
1710                            0,
1711                            HashSet::new(),
1712                            vec![
1713                                SstableInfoInner {
1714                                    object_id: 1,
1715                                    sst_id: 1,
1716                                    ..Default::default()
1717                                }
1718                                .into(),
1719                            ],
1720                            0,
1721                            version
1722                                .levels
1723                                .get(&1)
1724                                .as_ref()
1725                                .unwrap()
1726                                .compaction_group_version_id,
1727                        ))],
1728                    },
1729                ),
1730            ]),
1731            ..Default::default()
1732        };
1733        let version_delta = version_delta;
1734
1735        version.apply_version_delta(&version_delta);
1736        let mut cg1 = build_initial_compaction_group_levels(
1737            1,
1738            &CompactionConfig {
1739                max_level: 6,
1740                ..Default::default()
1741            },
1742        );
1743        cg1.levels[0] = Level {
1744            level_idx: 1,
1745            level_type: LevelType::Nonoverlapping,
1746            table_infos: vec![
1747                SstableInfoInner {
1748                    object_id: 1,
1749                    sst_id: 1,
1750                    ..Default::default()
1751                }
1752                .into(),
1753            ],
1754            ..Default::default()
1755        };
1756        assert_eq!(
1757            version,
1758            HummockVersion {
1759                id: HummockVersionId::new(1),
1760                levels: HashMap::from_iter([
1761                    (
1762                        2,
1763                        build_initial_compaction_group_levels(
1764                            2,
1765                            &CompactionConfig {
1766                                max_level: 6,
1767                                ..Default::default()
1768                            },
1769                        ),
1770                    ),
1771                    (1, cg1),
1772                ]),
1773                ..Default::default()
1774            }
1775        );
1776    }
1777
1778    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1779        gen_sst_info_impl(object_id, table_ids, left, right).into()
1780    }
1781
1782    fn gen_sst_info_impl(
1783        object_id: u64,
1784        table_ids: Vec<u32>,
1785        left: Bytes,
1786        right: Bytes,
1787    ) -> SstableInfoInner {
1788        SstableInfoInner {
1789            object_id,
1790            sst_id: object_id,
1791            key_range: KeyRange {
1792                left,
1793                right,
1794                right_exclusive: false,
1795            },
1796            table_ids,
1797            file_size: 100,
1798            sst_size: 100,
1799            uncompressed_file_size: 100,
1800            ..Default::default()
1801        }
1802    }
1803
1804    #[test]
1805    fn test_merge_levels() {
1806        let mut left_levels = build_initial_compaction_group_levels(
1807            1,
1808            &CompactionConfig {
1809                max_level: 6,
1810                ..Default::default()
1811            },
1812        );
1813
1814        let mut right_levels = build_initial_compaction_group_levels(
1815            2,
1816            &CompactionConfig {
1817                max_level: 6,
1818                ..Default::default()
1819            },
1820        );
1821
1822        left_levels.levels[0] = Level {
1823            level_idx: 1,
1824            level_type: LevelType::Nonoverlapping,
1825            table_infos: vec![
1826                gen_sst_info(
1827                    1,
1828                    vec![3],
1829                    FullKey::for_test(
1830                        TableId::new(3),
1831                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1832                        0,
1833                    )
1834                    .encode()
1835                    .into(),
1836                    FullKey::for_test(
1837                        TableId::new(3),
1838                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1839                        0,
1840                    )
1841                    .encode()
1842                    .into(),
1843                ),
1844                gen_sst_info(
1845                    10,
1846                    vec![3, 4],
1847                    FullKey::for_test(
1848                        TableId::new(3),
1849                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1850                        0,
1851                    )
1852                    .encode()
1853                    .into(),
1854                    FullKey::for_test(
1855                        TableId::new(4),
1856                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1857                        0,
1858                    )
1859                    .encode()
1860                    .into(),
1861                ),
1862                gen_sst_info(
1863                    11,
1864                    vec![4],
1865                    FullKey::for_test(
1866                        TableId::new(4),
1867                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1868                        0,
1869                    )
1870                    .encode()
1871                    .into(),
1872                    FullKey::for_test(
1873                        TableId::new(4),
1874                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1875                        0,
1876                    )
1877                    .encode()
1878                    .into(),
1879                ),
1880            ],
1881            total_file_size: 300,
1882            ..Default::default()
1883        };
1884
1885        left_levels.l0.sub_levels.push(Level {
1886            level_idx: 0,
1887            table_infos: vec![gen_sst_info(
1888                3,
1889                vec![3],
1890                FullKey::for_test(
1891                    TableId::new(3),
1892                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1893                    0,
1894                )
1895                .encode()
1896                .into(),
1897                FullKey::for_test(
1898                    TableId::new(3),
1899                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1900                    0,
1901                )
1902                .encode()
1903                .into(),
1904            )],
1905            sub_level_id: 101,
1906            level_type: LevelType::Overlapping,
1907            total_file_size: 100,
1908            ..Default::default()
1909        });
1910
1911        left_levels.l0.sub_levels.push(Level {
1912            level_idx: 0,
1913            table_infos: vec![gen_sst_info(
1914                3,
1915                vec![3],
1916                FullKey::for_test(
1917                    TableId::new(3),
1918                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1919                    0,
1920                )
1921                .encode()
1922                .into(),
1923                FullKey::for_test(
1924                    TableId::new(3),
1925                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1926                    0,
1927                )
1928                .encode()
1929                .into(),
1930            )],
1931            sub_level_id: 103,
1932            level_type: LevelType::Overlapping,
1933            total_file_size: 100,
1934            ..Default::default()
1935        });
1936
1937        left_levels.l0.sub_levels.push(Level {
1938            level_idx: 0,
1939            table_infos: vec![gen_sst_info(
1940                3,
1941                vec![3],
1942                FullKey::for_test(
1943                    TableId::new(3),
1944                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1945                    0,
1946                )
1947                .encode()
1948                .into(),
1949                FullKey::for_test(
1950                    TableId::new(3),
1951                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1952                    0,
1953                )
1954                .encode()
1955                .into(),
1956            )],
1957            sub_level_id: 105,
1958            level_type: LevelType::Nonoverlapping,
1959            total_file_size: 100,
1960            ..Default::default()
1961        });
1962
1963        right_levels.levels[0] = Level {
1964            level_idx: 1,
1965            level_type: LevelType::Nonoverlapping,
1966            table_infos: vec![
1967                gen_sst_info(
1968                    1,
1969                    vec![5],
1970                    FullKey::for_test(
1971                        TableId::new(5),
1972                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1973                        0,
1974                    )
1975                    .encode()
1976                    .into(),
1977                    FullKey::for_test(
1978                        TableId::new(5),
1979                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1980                        0,
1981                    )
1982                    .encode()
1983                    .into(),
1984                ),
1985                gen_sst_info(
1986                    10,
1987                    vec![5, 6],
1988                    FullKey::for_test(
1989                        TableId::new(5),
1990                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1991                        0,
1992                    )
1993                    .encode()
1994                    .into(),
1995                    FullKey::for_test(
1996                        TableId::new(6),
1997                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1998                        0,
1999                    )
2000                    .encode()
2001                    .into(),
2002                ),
2003                gen_sst_info(
2004                    11,
2005                    vec![6],
2006                    FullKey::for_test(
2007                        TableId::new(6),
2008                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2009                        0,
2010                    )
2011                    .encode()
2012                    .into(),
2013                    FullKey::for_test(
2014                        TableId::new(6),
2015                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2016                        0,
2017                    )
2018                    .encode()
2019                    .into(),
2020                ),
2021            ],
2022            total_file_size: 300,
2023            ..Default::default()
2024        };
2025
2026        right_levels.l0.sub_levels.push(Level {
2027            level_idx: 0,
2028            table_infos: vec![gen_sst_info(
2029                3,
2030                vec![5],
2031                FullKey::for_test(
2032                    TableId::new(5),
2033                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2034                    0,
2035                )
2036                .encode()
2037                .into(),
2038                FullKey::for_test(
2039                    TableId::new(5),
2040                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2041                    0,
2042                )
2043                .encode()
2044                .into(),
2045            )],
2046            sub_level_id: 101,
2047            level_type: LevelType::Overlapping,
2048            total_file_size: 100,
2049            ..Default::default()
2050        });
2051
2052        right_levels.l0.sub_levels.push(Level {
2053            level_idx: 0,
2054            table_infos: vec![gen_sst_info(
2055                5,
2056                vec![5],
2057                FullKey::for_test(
2058                    TableId::new(5),
2059                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2060                    0,
2061                )
2062                .encode()
2063                .into(),
2064                FullKey::for_test(
2065                    TableId::new(5),
2066                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2067                    0,
2068                )
2069                .encode()
2070                .into(),
2071            )],
2072            sub_level_id: 102,
2073            level_type: LevelType::Overlapping,
2074            total_file_size: 100,
2075            ..Default::default()
2076        });
2077
2078        right_levels.l0.sub_levels.push(Level {
2079            level_idx: 0,
2080            table_infos: vec![gen_sst_info(
2081                3,
2082                vec![5],
2083                FullKey::for_test(
2084                    TableId::new(5),
2085                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2086                    0,
2087                )
2088                .encode()
2089                .into(),
2090                FullKey::for_test(
2091                    TableId::new(5),
2092                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2093                    0,
2094                )
2095                .encode()
2096                .into(),
2097            )],
2098            sub_level_id: 103,
2099            level_type: LevelType::Nonoverlapping,
2100            total_file_size: 100,
2101            ..Default::default()
2102        });
2103
2104        {
2105            // test empty
2106            let mut left_levels = Levels::default();
2107            let right_levels = Levels::default();
2108
2109            group_split::merge_levels(&mut left_levels, right_levels);
2110        }
2111
2112        {
2113            // test empty left
2114            let mut left_levels = build_initial_compaction_group_levels(
2115                1,
2116                &CompactionConfig {
2117                    max_level: 6,
2118                    ..Default::default()
2119                },
2120            );
2121            let right_levels = right_levels.clone();
2122
2123            group_split::merge_levels(&mut left_levels, right_levels);
2124
2125            assert!(left_levels.l0.sub_levels.len() == 3);
2126            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2127            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2128            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2129            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2130            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2131            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2132
2133            assert!(left_levels.levels[0].level_idx == 1);
2134            assert_eq!(300, left_levels.levels[0].total_file_size);
2135        }
2136
2137        {
2138            // test empty right
2139            let mut left_levels = left_levels.clone();
2140            let right_levels = build_initial_compaction_group_levels(
2141                2,
2142                &CompactionConfig {
2143                    max_level: 6,
2144                    ..Default::default()
2145                },
2146            );
2147
2148            group_split::merge_levels(&mut left_levels, right_levels);
2149
2150            assert!(left_levels.l0.sub_levels.len() == 3);
2151            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2152            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2153            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2154            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2155            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2156            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2157
2158            assert!(left_levels.levels[0].level_idx == 1);
2159            assert_eq!(300, left_levels.levels[0].total_file_size);
2160        }
2161
2162        {
2163            let mut left_levels = left_levels.clone();
2164            let right_levels = right_levels.clone();
2165
2166            group_split::merge_levels(&mut left_levels, right_levels);
2167
2168            assert!(left_levels.l0.sub_levels.len() == 6);
2169            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2170            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2171            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2172            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2173            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2174            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2175            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2176            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2177            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2178            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2179            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2180            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2181
2182            assert!(left_levels.levels[0].level_idx == 1);
2183            assert_eq!(600, left_levels.levels[0].total_file_size);
2184        }
2185    }
2186
2187    #[test]
2188    fn test_get_split_pos() {
2189        let epoch = test_epoch(1);
2190        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2191        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2192        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2193
2194        let ssts = vec![s1, s2, s3];
2195        let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2196
2197        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2198        assert_eq!(1, pos);
2199
2200        let pos = group_split::get_split_pos(&vec![], split_key);
2201        assert_eq!(0, pos);
2202    }
2203
2204    #[test]
2205    fn test_split_sst() {
2206        let epoch = test_epoch(1);
2207        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2208
2209        {
2210            let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2211            let origin_sst = sst.clone();
2212            let sst_size = origin_sst.sst_size;
2213            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2214            assert_eq!(SstSplitType::Both, split_type);
2215
2216            let mut new_sst_id = 10;
2217            let (origin_sst, branched_sst) = group_split::split_sst(
2218                origin_sst,
2219                &mut new_sst_id,
2220                split_key,
2221                sst_size / 2,
2222                sst_size / 2,
2223            );
2224
2225            let origin_sst = origin_sst.unwrap();
2226            let branched_sst = branched_sst.unwrap();
2227
2228            assert!(origin_sst.key_range.right_exclusive);
2229            assert!(
2230                origin_sst
2231                    .key_range
2232                    .right
2233                    .cmp(&branched_sst.key_range.left)
2234                    .is_le()
2235            );
2236            assert!(origin_sst.table_ids.is_sorted());
2237            assert!(branched_sst.table_ids.is_sorted());
2238            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2239            assert!(branched_sst.sst_size < origin_sst.file_size);
2240            assert_eq!(10, branched_sst.sst_id);
2241            assert_eq!(11, origin_sst.sst_id);
2242            assert_eq!(&3, branched_sst.table_ids.first().unwrap()); // split table_id to right
2243        }
2244
2245        {
2246            // test un-exist table_id
2247            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2248            let origin_sst = sst.clone();
2249            let sst_size = origin_sst.sst_size;
2250            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2251            assert_eq!(SstSplitType::Both, split_type);
2252
2253            let mut new_sst_id = 10;
2254            let (origin_sst, branched_sst) = group_split::split_sst(
2255                origin_sst,
2256                &mut new_sst_id,
2257                split_key,
2258                sst_size / 2,
2259                sst_size / 2,
2260            );
2261
2262            let origin_sst = origin_sst.unwrap();
2263            let branched_sst = branched_sst.unwrap();
2264
2265            assert!(origin_sst.key_range.right_exclusive);
2266            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2267            assert!(origin_sst.table_ids.is_sorted());
2268            assert!(branched_sst.table_ids.is_sorted());
2269            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2270            assert!(branched_sst.sst_size < origin_sst.file_size);
2271            assert_eq!(10, branched_sst.sst_id);
2272            assert_eq!(11, origin_sst.sst_id);
2273            assert_eq!(&5, branched_sst.table_ids.first().unwrap()); // split table_id to right
2274        }
2275
2276        {
2277            let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2278            let origin_sst = sst.clone();
2279            let split_type = group_split::need_to_split(&origin_sst, split_key);
2280            assert_eq!(SstSplitType::Left, split_type);
2281        }
2282
2283        {
2284            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2285            let origin_sst = sst.clone();
2286            let split_type = group_split::need_to_split(&origin_sst, split_key);
2287            assert_eq!(SstSplitType::Both, split_type);
2288
2289            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2290            let origin_sst = sst.clone();
2291            let split_type = group_split::need_to_split(&origin_sst, split_key);
2292            assert_eq!(SstSplitType::Right, split_type);
2293        }
2294
2295        {
2296            // test key_range left = right
2297            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2298            sst.key_range.right = sst.key_range.left.clone();
2299            let sst: SstableInfo = sst.into();
2300            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2301            let origin_sst = sst.clone();
2302            let sst_size = origin_sst.sst_size;
2303
2304            let mut new_sst_id = 10;
2305            let (origin_sst, branched_sst) = group_split::split_sst(
2306                origin_sst,
2307                &mut new_sst_id,
2308                split_key,
2309                sst_size / 2,
2310                sst_size / 2,
2311            );
2312
2313            assert!(origin_sst.is_none());
2314            assert!(branched_sst.is_some());
2315        }
2316    }
2317
2318    #[test]
2319    fn test_split_sst_info_for_level() {
2320        let mut version = HummockVersion {
2321            id: HummockVersionId(0),
2322            levels: HashMap::from_iter([(
2323                1,
2324                build_initial_compaction_group_levels(
2325                    1,
2326                    &CompactionConfig {
2327                        max_level: 6,
2328                        ..Default::default()
2329                    },
2330                ),
2331            )]),
2332            ..Default::default()
2333        };
2334
2335        let cg1 = version.levels.get_mut(&1).unwrap();
2336
2337        cg1.levels[0] = Level {
2338            level_idx: 1,
2339            level_type: LevelType::Nonoverlapping,
2340            table_infos: vec![
2341                gen_sst_info(
2342                    1,
2343                    vec![3],
2344                    FullKey::for_test(
2345                        TableId::new(3),
2346                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2347                        0,
2348                    )
2349                    .encode()
2350                    .into(),
2351                    FullKey::for_test(
2352                        TableId::new(3),
2353                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2354                        0,
2355                    )
2356                    .encode()
2357                    .into(),
2358                ),
2359                gen_sst_info(
2360                    10,
2361                    vec![3, 4],
2362                    FullKey::for_test(
2363                        TableId::new(3),
2364                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2365                        0,
2366                    )
2367                    .encode()
2368                    .into(),
2369                    FullKey::for_test(
2370                        TableId::new(4),
2371                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2372                        0,
2373                    )
2374                    .encode()
2375                    .into(),
2376                ),
2377                gen_sst_info(
2378                    11,
2379                    vec![4],
2380                    FullKey::for_test(
2381                        TableId::new(4),
2382                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2383                        0,
2384                    )
2385                    .encode()
2386                    .into(),
2387                    FullKey::for_test(
2388                        TableId::new(4),
2389                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2390                        0,
2391                    )
2392                    .encode()
2393                    .into(),
2394                ),
2395            ],
2396            total_file_size: 300,
2397            ..Default::default()
2398        };
2399
2400        cg1.l0.sub_levels.push(Level {
2401            level_idx: 0,
2402            table_infos: vec![
2403                gen_sst_info(
2404                    2,
2405                    vec![2],
2406                    FullKey::for_test(
2407                        TableId::new(0),
2408                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2409                        0,
2410                    )
2411                    .encode()
2412                    .into(),
2413                    FullKey::for_test(
2414                        TableId::new(2),
2415                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2416                        0,
2417                    )
2418                    .encode()
2419                    .into(),
2420                ),
2421                gen_sst_info(
2422                    22,
2423                    vec![2],
2424                    FullKey::for_test(
2425                        TableId::new(0),
2426                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2427                        0,
2428                    )
2429                    .encode()
2430                    .into(),
2431                    FullKey::for_test(
2432                        TableId::new(2),
2433                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2434                        0,
2435                    )
2436                    .encode()
2437                    .into(),
2438                ),
2439                gen_sst_info(
2440                    23,
2441                    vec![2],
2442                    FullKey::for_test(
2443                        TableId::new(0),
2444                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2445                        0,
2446                    )
2447                    .encode()
2448                    .into(),
2449                    FullKey::for_test(
2450                        TableId::new(2),
2451                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2452                        0,
2453                    )
2454                    .encode()
2455                    .into(),
2456                ),
2457                gen_sst_info(
2458                    24,
2459                    vec![2],
2460                    FullKey::for_test(
2461                        TableId::new(2),
2462                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2463                        0,
2464                    )
2465                    .encode()
2466                    .into(),
2467                    FullKey::for_test(
2468                        TableId::new(2),
2469                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2470                        0,
2471                    )
2472                    .encode()
2473                    .into(),
2474                ),
2475                gen_sst_info(
2476                    25,
2477                    vec![2],
2478                    FullKey::for_test(
2479                        TableId::new(0),
2480                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2481                        0,
2482                    )
2483                    .encode()
2484                    .into(),
2485                    FullKey::for_test(
2486                        TableId::new(0),
2487                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2488                        0,
2489                    )
2490                    .encode()
2491                    .into(),
2492                ),
2493            ],
2494            sub_level_id: 101,
2495            level_type: LevelType::Overlapping,
2496            total_file_size: 300,
2497            ..Default::default()
2498        });
2499
2500        {
2501            // split Overlapping level
2502            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2503
2504            let mut new_sst_id = 100;
2505            let x = group_split::split_sst_info_for_level_v2(
2506                &mut cg1.l0.sub_levels[0],
2507                &mut new_sst_id,
2508                split_key,
2509            );
2510            // assert_eq!(3, x.len());
2511            // assert_eq!(100, x[0].sst_id);
2512            // assert_eq!(100, x[0].sst_size);
2513            // assert_eq!(101, x[1].sst_id);
2514            // assert_eq!(100, x[1].sst_size);
2515            // assert_eq!(102, x[2].sst_id);
2516            // assert_eq!(100, x[2].sst_size);
2517
2518            let mut right_l0 = OverlappingLevel {
2519                sub_levels: vec![],
2520                total_file_size: 0,
2521                uncompressed_file_size: 0,
2522            };
2523
2524            right_l0.sub_levels.push(Level {
2525                level_idx: 0,
2526                table_infos: x,
2527                sub_level_id: 101,
2528                total_file_size: 100,
2529                level_type: LevelType::Overlapping,
2530                ..Default::default()
2531            });
2532
2533            let right_levels = Levels {
2534                levels: vec![],
2535                l0: right_l0,
2536                ..Default::default()
2537            };
2538
2539            merge_levels(cg1, right_levels);
2540        }
2541
2542        {
2543            // test split empty level
2544            let mut new_sst_id = 100;
2545            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2546            let x = group_split::split_sst_info_for_level_v2(
2547                &mut cg1.levels[2],
2548                &mut new_sst_id,
2549                split_key,
2550            );
2551
2552            assert!(x.is_empty());
2553        }
2554
2555        {
2556            // test split to right Nonoverlapping level
2557            let mut cg1 = cg1.clone();
2558            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2559
2560            let mut new_sst_id = 100;
2561            let x = group_split::split_sst_info_for_level_v2(
2562                &mut cg1.levels[0],
2563                &mut new_sst_id,
2564                split_key,
2565            );
2566
2567            assert_eq!(3, x.len());
2568            assert_eq!(1, x[0].sst_id);
2569            assert_eq!(100, x[0].sst_size);
2570            assert_eq!(10, x[1].sst_id);
2571            assert_eq!(100, x[1].sst_size);
2572            assert_eq!(11, x[2].sst_id);
2573            assert_eq!(100, x[2].sst_size);
2574
2575            assert_eq!(0, cg1.levels[0].table_infos.len());
2576        }
2577
2578        {
2579            // test split to left Nonoverlapping level
2580            let mut cg1 = cg1.clone();
2581            let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2582
2583            let mut new_sst_id = 100;
2584            let x = group_split::split_sst_info_for_level_v2(
2585                &mut cg1.levels[0],
2586                &mut new_sst_id,
2587                split_key,
2588            );
2589
2590            assert_eq!(0, x.len());
2591            assert_eq!(3, cg1.levels[0].table_infos.len());
2592        }
2593
2594        // {
2595        //     // test split to both Nonoverlapping level
2596        //     let mut cg1 = cg1.clone();
2597        //     let split_key = build_split_key(3, VirtualNode::MAX);
2598
2599        //     let mut new_sst_id = 100;
2600        //     let x = group_split::split_sst_info_for_level_v2(
2601        //         &mut cg1.levels[0],
2602        //         &mut new_sst_id,
2603        //         split_key,
2604        //     );
2605
2606        //     assert_eq!(2, x.len());
2607        //     assert_eq!(100, x[0].sst_id);
2608        //     assert_eq!(100 / 2, x[0].sst_size);
2609        //     assert_eq!(11, x[1].sst_id);
2610        //     assert_eq!(100, x[1].sst_size);
2611        //     assert_eq!(vec![3, 4], x[0].table_ids);
2612
2613        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2614        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2615        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2616        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2617        // }
2618
2619        {
2620            // test split to both Nonoverlapping level
2621            let mut cg1 = cg1.clone();
2622            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2623
2624            let mut new_sst_id = 100;
2625            let x = group_split::split_sst_info_for_level_v2(
2626                &mut cg1.levels[0],
2627                &mut new_sst_id,
2628                split_key,
2629            );
2630
2631            assert_eq!(2, x.len());
2632            assert_eq!(100, x[0].sst_id);
2633            assert_eq!(100 / 2, x[0].sst_size);
2634            assert_eq!(11, x[1].sst_id);
2635            assert_eq!(100, x[1].sst_size);
2636            assert_eq!(vec![4], x[1].table_ids);
2637
2638            assert_eq!(2, cg1.levels[0].table_infos.len());
2639            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2640            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2641            assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2642        }
2643    }
2644}