risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51    pub insert_sst_level: u32,
52    pub insert_sst_infos: Vec<SstableInfo>,
53    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60        self.levels
61            .get(&compaction_group_id)
62            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63    }
64
65    pub fn get_compaction_group_levels_mut(
66        &mut self,
67        compaction_group_id: CompactionGroupId,
68    ) -> &mut Levels {
69        self.levels
70            .get_mut(&compaction_group_id)
71            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72    }
73
74    // only scan the sst infos from levels in the specified compaction group (without table change log)
75    pub fn get_sst_ids_by_group_id(
76        &self,
77        compaction_group_id: CompactionGroupId,
78    ) -> impl Iterator<Item = HummockSstableId> + '_ {
79        self.levels
80            .iter()
81            .filter_map(move |(cg_id, level)| {
82                if *cg_id == compaction_group_id {
83                    Some(level)
84                } else {
85                    None
86                }
87            })
88            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89            .flat_map(|level| level.table_infos.iter())
90            .map(|s| s.sst_id)
91    }
92
93    pub fn level_iter<F: FnMut(&Level) -> bool>(
94        &self,
95        compaction_group_id: CompactionGroupId,
96        mut f: F,
97    ) {
98        if let Some(levels) = self.levels.get(&compaction_group_id) {
99            for sub_level in &levels.l0.sub_levels {
100                if !f(sub_level) {
101                    return;
102                }
103            }
104            for level in &levels.levels {
105                if !f(level) {
106                    return;
107                }
108            }
109        }
110    }
111
112    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113        // l0 is currently separated from all levels
114        self.levels
115            .get(&compaction_group_id)
116            .map(|group| group.levels.len() + 1)
117            .unwrap_or(0)
118    }
119
120    pub fn safe_epoch_table_watermarks(
121        &self,
122        existing_table_ids: &[u32],
123    ) -> BTreeMap<u32, TableWatermarks> {
124        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125    }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130    existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132    fn extract_single_table_watermark(
133        table_watermarks: &TableWatermarks,
134    ) -> Option<TableWatermarks> {
135        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136            Some(TableWatermarks {
137                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138                direction: table_watermarks.direction,
139                watermark_type: table_watermarks.watermark_type,
140            })
141        } else {
142            None
143        }
144    }
145    table_watermarks
146        .iter()
147        .filter_map(|(table_id, table_watermarks)| {
148            let u32_table_id = table_id.table_id();
149            if !existing_table_ids.contains(&u32_table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (table_id.table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                TableId::from(table_id),
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level
296                    .table_infos
297                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298                    .for_each(|sst_info| {
299                        sub_level.total_file_size -= sst_info.sst_size;
300                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301                        l0.total_file_size -= sst_info.sst_size;
302                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303                    });
304                if insert_table_infos.is_empty() {
305                    continue;
306                }
307                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308                    Ok(idx) => {
309                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310                    }
311                    Err(idx) => {
312                        insert_new_sub_level(
313                            target_l0,
314                            sub_level.sub_level_id,
315                            sub_level.level_type,
316                            insert_table_infos,
317                            Some(idx),
318                        );
319                    }
320                }
321            }
322
323            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324        }
325        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326            let insert_table_infos =
327                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328            cur_levels.levels[idx].total_file_size += insert_table_infos
329                .iter()
330                .map(|sst| sst.sst_size)
331                .sum::<u64>();
332            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333                .iter()
334                .map(|sst| sst.uncompressed_file_size)
335                .sum::<u64>();
336            cur_levels.levels[idx]
337                .table_infos
338                .extend(insert_table_infos);
339            cur_levels.levels[idx]
340                .table_infos
341                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342            assert!(can_concat(&cur_levels.levels[idx].table_infos));
343            level
344                .table_infos
345                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346                .for_each(|sst_info| {
347                    level.total_file_size -= sst_info.sst_size;
348                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349                });
350        }
351
352        assert!(
353            parent_levels
354                .l0
355                .sub_levels
356                .iter()
357                .all(|level| !level.table_infos.is_empty())
358        );
359        assert!(
360            cur_levels
361                .l0
362                .sub_levels
363                .iter()
364                .all(|level| !level.table_infos.is_empty())
365        );
366    }
367
368    pub fn build_sst_delta_infos(
369        &self,
370        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371    ) -> Vec<SstDeltaInfo> {
372        let mut infos = vec![];
373
374        // Skip trivial move delta for refiller
375        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
376        if version_delta.trivial_move {
377            return infos;
378        }
379
380        for (group_id, group_deltas) in &version_delta.group_deltas {
381            let mut info = SstDeltaInfo::default();
382
383            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386            // Build only if all deltas are intra level deltas.
387            if !group_deltas.group_deltas.iter().all(|delta| {
388                matches!(
389                    delta,
390                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391                )
392            }) {
393                continue;
394            }
395
396            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
397            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
398            // struct to reduce conventions.
399            for group_delta in &group_deltas.group_deltas {
400                match group_delta {
401                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402                        if !inserted_table_infos.is_empty() {
403                            info.insert_sst_level = 0;
404                            info.insert_sst_infos
405                                .extend(inserted_table_infos.iter().cloned());
406                        }
407                    }
408                    GroupDeltaCommon::IntraLevel(intra_level) => {
409                        if !intra_level.inserted_table_infos.is_empty() {
410                            info.insert_sst_level = intra_level.level_idx;
411                            info.insert_sst_infos
412                                .extend(intra_level.inserted_table_infos.iter().cloned());
413                        }
414                        if !intra_level.removed_table_ids.is_empty() {
415                            for id in &intra_level.removed_table_ids {
416                                if intra_level.level_idx == 0 {
417                                    removed_l0_ssts.insert(*id);
418                                } else {
419                                    removed_ssts
420                                        .entry(intra_level.level_idx)
421                                        .or_default()
422                                        .insert(*id);
423                                }
424                            }
425                        }
426                    }
427                    GroupDeltaCommon::GroupConstruct(_)
428                    | GroupDeltaCommon::GroupDestroy(_)
429                    | GroupDeltaCommon::GroupMerge(_) => {}
430                }
431            }
432
433            let group = self.levels.get(group_id).unwrap();
434            for l0_sub_level in &group.level0().sub_levels {
435                for sst_info in &l0_sub_level.table_infos {
436                    if removed_l0_ssts.remove(&sst_info.sst_id) {
437                        info.delete_sst_object_ids.push(sst_info.object_id);
438                    }
439                }
440            }
441            for level in &group.levels {
442                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443                    for sst_info in &level.table_infos {
444                        if removed_level_ssts.remove(&sst_info.sst_id) {
445                            info.delete_sst_object_ids.push(sst_info.object_id);
446                        }
447                    }
448                    if !removed_level_ssts.is_empty() {
449                        tracing::error!(
450                            "removed_level_ssts is not empty: {:?}",
451                            removed_level_ssts,
452                        );
453                    }
454                    debug_assert!(removed_level_ssts.is_empty());
455                }
456            }
457
458            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459                tracing::error!(
460                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461                    removed_l0_ssts,
462                    removed_ssts
463                );
464            }
465            debug_assert!(removed_l0_ssts.is_empty());
466            debug_assert!(removed_ssts.is_empty());
467
468            infos.push(info);
469        }
470
471        infos
472    }
473
474    pub fn apply_version_delta(
475        &mut self,
476        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477    ) {
478        assert_eq!(self.id, version_delta.prev_id);
479
480        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481            &version_delta.state_table_info_delta,
482            &version_delta.removed_table_ids,
483        );
484
485        #[expect(deprecated)]
486        {
487            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488                is_commit_epoch = true;
489                tracing::trace!(
490                    "max committed epoch bumped but no table committed epoch is changed"
491                );
492            }
493        }
494
495        // apply to `levels`, which is different compaction groups
496        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497            let mut is_applied_l0_compact = false;
498            for group_delta in &group_deltas.group_deltas {
499                match group_delta {
500                    GroupDeltaCommon::GroupConstruct(group_construct) => {
501                        let mut new_levels = build_initial_compaction_group_levels(
502                            *compaction_group_id,
503                            group_construct.get_group_config().unwrap(),
504                        );
505                        let parent_group_id = group_construct.parent_group_id;
506                        new_levels.parent_group_id = parent_group_id;
507                        #[expect(deprecated)]
508                        // for backward-compatibility of previous hummock version delta
509                        new_levels
510                            .member_table_ids
511                            .clone_from(&group_construct.table_ids);
512                        self.levels.insert(*compaction_group_id, new_levels);
513                        let member_table_ids = if group_construct.version()
514                            >= CompatibilityVersion::NoMemberTableIds
515                        {
516                            self.state_table_info
517                                .compaction_group_member_table_ids(*compaction_group_id)
518                                .iter()
519                                .map(|table_id| table_id.table_id)
520                                .collect()
521                        } else {
522                            #[expect(deprecated)]
523                            // for backward-compatibility of previous hummock version delta
524                            BTreeSet::from_iter(group_construct.table_ids.clone())
525                        };
526
527                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
528                            let split_key = if group_construct.split_key.is_some() {
529                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
530                            } else {
531                                None
532                            };
533                            self.init_with_parent_group_v2(
534                                parent_group_id,
535                                *compaction_group_id,
536                                group_construct.get_new_sst_start_id().into(),
537                                split_key.clone(),
538                            );
539                        } else {
540                            // for backward-compatibility of previous hummock version delta
541                            self.init_with_parent_group(
542                                parent_group_id,
543                                *compaction_group_id,
544                                member_table_ids,
545                                group_construct.get_new_sst_start_id().into(),
546                            );
547                        }
548                    }
549                    GroupDeltaCommon::GroupMerge(group_merge) => {
550                        tracing::info!(
551                            "group_merge left {:?} right {:?}",
552                            group_merge.left_group_id,
553                            group_merge.right_group_id
554                        );
555                        self.merge_compaction_group(
556                            group_merge.left_group_id,
557                            group_merge.right_group_id,
558                        )
559                    }
560                    GroupDeltaCommon::IntraLevel(level_delta) => {
561                        let levels =
562                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
563                                panic!("compaction group {} does not exist", compaction_group_id)
564                            });
565                        if is_commit_epoch {
566                            assert!(
567                                level_delta.removed_table_ids.is_empty(),
568                                "no sst should be deleted when committing an epoch"
569                            );
570
571                            let IntraLevelDelta {
572                                level_idx,
573                                l0_sub_level_id,
574                                inserted_table_infos,
575                                ..
576                            } = level_delta;
577                            {
578                                assert_eq!(
579                                    *level_idx, 0,
580                                    "we should only add to L0 when we commit an epoch."
581                                );
582                                if !inserted_table_infos.is_empty() {
583                                    insert_new_sub_level(
584                                        &mut levels.l0,
585                                        *l0_sub_level_id,
586                                        PbLevelType::Overlapping,
587                                        inserted_table_infos.clone(),
588                                        None,
589                                    );
590                                }
591                            }
592                        } else {
593                            // The delta is caused by compaction.
594                            levels.apply_compact_ssts(
595                                level_delta,
596                                self.state_table_info
597                                    .compaction_group_member_table_ids(*compaction_group_id),
598                            );
599                            if level_delta.level_idx == 0 {
600                                is_applied_l0_compact = true;
601                            }
602                        }
603                    }
604                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
605                        let levels =
606                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
607                                panic!("compaction group {} does not exist", compaction_group_id)
608                            });
609                        assert!(is_commit_epoch);
610
611                        if !inserted_table_infos.is_empty() {
612                            let next_l0_sub_level_id = levels
613                                .l0
614                                .sub_levels
615                                .last()
616                                .map(|level| level.sub_level_id + 1)
617                                .unwrap_or(1);
618
619                            insert_new_sub_level(
620                                &mut levels.l0,
621                                next_l0_sub_level_id,
622                                PbLevelType::Overlapping,
623                                inserted_table_infos.clone(),
624                                None,
625                            );
626                        }
627                    }
628                    GroupDeltaCommon::GroupDestroy(_) => {
629                        self.levels.remove(compaction_group_id);
630                    }
631                }
632            }
633            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
634            {
635                levels.post_apply_l0_compact();
636            }
637        }
638        self.id = version_delta.id;
639        #[expect(deprecated)]
640        {
641            self.max_committed_epoch = version_delta.max_committed_epoch;
642        }
643
644        // apply to table watermark
645
646        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
647        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
648            HashMap::new();
649
650        // apply to table watermark
651        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
652            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
653                if version_delta.removed_table_ids.contains(table_id) {
654                    modified_table_watermarks.insert(*table_id, None);
655                } else {
656                    let mut current_table_watermarks = (**current_table_watermarks).clone();
657                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
658                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
659                }
660            } else {
661                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
662            }
663        }
664        for (table_id, table_watermarks) in &self.table_watermarks {
665            let safe_epoch = if let Some(state_table_info) =
666                self.state_table_info.info().get(table_id)
667                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
668                && state_table_info.committed_epoch > *oldest_epoch
669            {
670                // safe epoch has progressed, need further clear.
671                state_table_info.committed_epoch
672            } else {
673                // safe epoch not progressed or the table has been removed. No need to truncate
674                continue;
675            };
676            let table_watermarks = modified_table_watermarks
677                .entry(*table_id)
678                .or_insert_with(|| Some((**table_watermarks).clone()));
679            if let Some(table_watermarks) = table_watermarks {
680                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
681            }
682        }
683        // apply the staging table watermark to hummock version
684        for (table_id, table_watermarks) in modified_table_watermarks {
685            if let Some(table_watermarks) = table_watermarks {
686                self.table_watermarks
687                    .insert(table_id, Arc::new(table_watermarks));
688            } else {
689                self.table_watermarks.remove(&table_id);
690            }
691        }
692
693        // apply to table change log
694        Self::apply_change_log_delta(
695            &mut self.table_change_log,
696            &version_delta.change_log_delta,
697            &version_delta.removed_table_ids,
698            &version_delta.state_table_info_delta,
699            &changed_table_info,
700        );
701
702        // apply to vector index
703        apply_vector_index_delta(
704            &mut self.vector_indexes,
705            &version_delta.vector_index_delta,
706            &version_delta.removed_table_ids,
707        );
708    }
709
710    pub fn apply_change_log_delta<T: Clone>(
711        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
712        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
713        removed_table_ids: &HashSet<TableId>,
714        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
715        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
716    ) {
717        for (table_id, change_log_delta) in change_log_delta {
718            let new_change_log = &change_log_delta.new_log;
719            match table_change_log.entry(*table_id) {
720                Entry::Occupied(entry) => {
721                    let change_log = entry.into_mut();
722                    change_log.add_change_log(new_change_log.clone());
723                }
724                Entry::Vacant(entry) => {
725                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
726                }
727            };
728        }
729
730        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
731        // the change log for the table, and then we will remove the table change log.
732        // The table change log will also be removed when the table id is removed.
733        table_change_log.retain(|table_id, _| {
734            if removed_table_ids.contains(table_id) {
735                return false;
736            }
737            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
738                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
739                // the table exists previously, and its committed epoch has progressed.
740            } else {
741                // otherwise, the table change log should be kept anyway
742                return true;
743            }
744            let contains = change_log_delta.contains_key(table_id);
745            if !contains {
746                warn!(
747                        ?table_id,
748                        "table change log dropped due to no further change log at newly committed epoch",
749                    );
750            }
751            contains
752        });
753
754        // truncate the remaining table change log
755        for (table_id, change_log_delta) in change_log_delta {
756            if let Some(change_log) = table_change_log.get_mut(table_id) {
757                change_log.truncate(change_log_delta.truncate_epoch);
758            }
759        }
760    }
761
762    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
763        let mut ret: BTreeMap<_, _> = BTreeMap::new();
764        for (compaction_group_id, group) in &self.levels {
765            let mut levels = vec![];
766            levels.extend(group.l0.sub_levels.iter());
767            levels.extend(group.levels.iter());
768            for level in levels {
769                for table_info in &level.table_infos {
770                    if table_info.sst_id.inner() == table_info.object_id.inner() {
771                        continue;
772                    }
773                    let object_id = table_info.object_id;
774                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
775                    entry
776                        .entry(*compaction_group_id)
777                        .or_default()
778                        .push(table_info.sst_id)
779                }
780            }
781        }
782        ret
783    }
784
785    pub fn merge_compaction_group(
786        &mut self,
787        left_group_id: CompactionGroupId,
788        right_group_id: CompactionGroupId,
789    ) {
790        // Double check
791        let left_group_id_table_ids = self
792            .state_table_info
793            .compaction_group_member_table_ids(left_group_id)
794            .iter()
795            .map(|table_id| table_id.table_id);
796        let right_group_id_table_ids = self
797            .state_table_info
798            .compaction_group_member_table_ids(right_group_id)
799            .iter()
800            .map(|table_id| table_id.table_id);
801
802        assert!(
803            left_group_id_table_ids
804                .chain(right_group_id_table_ids)
805                .is_sorted()
806        );
807
808        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
809        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
810            panic!(
811                "compaction group should exist right {} all {:?}",
812                right_group_id, total_cg
813            )
814        });
815
816        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
817            panic!(
818                "compaction group should exist left {} all {:?}",
819                left_group_id, total_cg
820            )
821        });
822
823        group_split::merge_levels(left_levels, right_levels);
824    }
825
826    pub fn init_with_parent_group_v2(
827        &mut self,
828        parent_group_id: CompactionGroupId,
829        group_id: CompactionGroupId,
830        new_sst_start_id: HummockSstableId,
831        split_key: Option<Bytes>,
832    ) {
833        let mut new_sst_id = new_sst_start_id;
834        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
835            if new_sst_start_id != 0 {
836                if cfg!(debug_assertions) {
837                    panic!(
838                        "non-zero sst start id {} for NewCompactionGroup",
839                        new_sst_start_id
840                    );
841                } else {
842                    warn!(
843                        %new_sst_start_id,
844                        "non-zero sst start id for NewCompactionGroup"
845                    );
846                }
847            }
848            return;
849        } else if !self.levels.contains_key(&parent_group_id) {
850            unreachable!(
851                "non-existing parent group id {} to init from (V2)",
852                parent_group_id
853            );
854        }
855
856        let [parent_levels, cur_levels] = self
857            .levels
858            .get_disjoint_mut([&parent_group_id, &group_id])
859            .map(|res| res.unwrap());
860        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
861        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
862        parent_levels.compaction_group_version_id += 1;
863        cur_levels.compaction_group_version_id += 1;
864
865        let l0 = &mut parent_levels.l0;
866        {
867            for sub_level in &mut l0.sub_levels {
868                let target_l0 = &mut cur_levels.l0;
869                // Remove SST from sub level may result in empty sub level. It will be purged
870                // whenever another compaction task is finished.
871                let insert_table_infos = if let Some(split_key) = &split_key {
872                    group_split::split_sst_info_for_level_v2(
873                        sub_level,
874                        &mut new_sst_id,
875                        split_key.clone(),
876                    )
877                } else {
878                    vec![]
879                };
880
881                if insert_table_infos.is_empty() {
882                    continue;
883                }
884
885                sub_level
886                    .table_infos
887                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
888                    .for_each(|sst_info| {
889                        sub_level.total_file_size -= sst_info.sst_size;
890                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
891                        l0.total_file_size -= sst_info.sst_size;
892                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
893                    });
894                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
895                    Ok(idx) => {
896                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
897                    }
898                    Err(idx) => {
899                        insert_new_sub_level(
900                            target_l0,
901                            sub_level.sub_level_id,
902                            sub_level.level_type,
903                            insert_table_infos,
904                            Some(idx),
905                        );
906                    }
907                }
908            }
909
910            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
911        }
912
913        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
914            let insert_table_infos = if let Some(split_key) = &split_key {
915                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
916            } else {
917                vec![]
918            };
919
920            if insert_table_infos.is_empty() {
921                continue;
922            }
923
924            cur_levels.levels[idx].total_file_size += insert_table_infos
925                .iter()
926                .map(|sst| sst.sst_size)
927                .sum::<u64>();
928            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
929                .iter()
930                .map(|sst| sst.uncompressed_file_size)
931                .sum::<u64>();
932            cur_levels.levels[idx]
933                .table_infos
934                .extend(insert_table_infos);
935            cur_levels.levels[idx]
936                .table_infos
937                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
938            assert!(can_concat(&cur_levels.levels[idx].table_infos));
939            level
940                .table_infos
941                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
942                .for_each(|sst_info| {
943                    level.total_file_size -= sst_info.sst_size;
944                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
945                });
946        }
947
948        assert!(
949            parent_levels
950                .l0
951                .sub_levels
952                .iter()
953                .all(|level| !level.table_infos.is_empty())
954        );
955        assert!(
956            cur_levels
957                .l0
958                .sub_levels
959                .iter()
960                .all(|level| !level.table_infos.is_empty())
961        );
962    }
963}
964
965impl<T> HummockVersionCommon<T>
966where
967    T: SstableIdReader + ObjectIdReader,
968{
969    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
970        self.get_sst_infos(exclude_change_log)
971            .map(|s| s.object_id())
972            .collect()
973    }
974
975    pub fn get_object_ids(
976        &self,
977        exclude_change_log: bool,
978    ) -> impl Iterator<Item = HummockObjectId> + '_ {
979        // DO NOT REMOVE THIS LINE
980        // This is to ensure that when adding new variant to `HummockObjectId`,
981        // the compiler will warn us if we forget to handle it here.
982        match HummockObjectId::Sstable(0.into()) {
983            HummockObjectId::Sstable(_) => {}
984            HummockObjectId::VectorFile(_) => {}
985        };
986        self.get_sst_infos(exclude_change_log)
987            .map(|s| HummockObjectId::Sstable(s.object_id()))
988            .chain(
989                self.vector_indexes
990                    .values()
991                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
992            )
993    }
994
995    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
996        self.get_sst_infos(exclude_change_log)
997            .map(|s| s.sst_id())
998            .collect()
999    }
1000
1001    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1002        let may_table_change_log = if exclude_change_log {
1003            None
1004        } else {
1005            Some(self.table_change_log.values())
1006        };
1007        self.get_combined_levels()
1008            .flat_map(|level| level.table_infos.iter())
1009            .chain(
1010                may_table_change_log
1011                    .map(|v| {
1012                        v.flat_map(|table_change_log| {
1013                            table_change_log.iter().flat_map(|epoch_change_log| {
1014                                epoch_change_log
1015                                    .old_value
1016                                    .iter()
1017                                    .chain(epoch_change_log.new_value.iter())
1018                            })
1019                        })
1020                    })
1021                    .into_iter()
1022                    .flatten(),
1023            )
1024    }
1025}
1026
1027impl Levels {
1028    pub(crate) fn apply_compact_ssts(
1029        &mut self,
1030        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1031        member_table_ids: &BTreeSet<TableId>,
1032    ) {
1033        let IntraLevelDeltaCommon {
1034            level_idx,
1035            l0_sub_level_id,
1036            inserted_table_infos: insert_table_infos,
1037            vnode_partition_count,
1038            removed_table_ids: delete_sst_ids_set,
1039            compaction_group_version_id,
1040        } = level_delta;
1041        let new_vnode_partition_count = *vnode_partition_count;
1042
1043        if is_compaction_task_expired(
1044            self.compaction_group_version_id,
1045            *compaction_group_version_id,
1046        ) {
1047            warn!(
1048                current_compaction_group_version_id = self.compaction_group_version_id,
1049                delta_compaction_group_version_id = compaction_group_version_id,
1050                level_idx,
1051                l0_sub_level_id,
1052                insert_table_infos = ?insert_table_infos
1053                    .iter()
1054                    .map(|sst| (sst.sst_id, sst.object_id))
1055                    .collect_vec(),
1056                ?delete_sst_ids_set,
1057                "This VersionDelta may be committed by an expired compact task. Please check it."
1058            );
1059            return;
1060        }
1061        if !delete_sst_ids_set.is_empty() {
1062            if *level_idx == 0 {
1063                for level in &mut self.l0.sub_levels {
1064                    level_delete_ssts(level, delete_sst_ids_set);
1065                }
1066            } else {
1067                let idx = *level_idx as usize - 1;
1068                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1069            }
1070        }
1071
1072        if !insert_table_infos.is_empty() {
1073            let insert_sst_level_id = *level_idx;
1074            let insert_sub_level_id = *l0_sub_level_id;
1075            if insert_sst_level_id == 0 {
1076                let l0 = &mut self.l0;
1077                let index = l0
1078                    .sub_levels
1079                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1080                assert!(
1081                    index < l0.sub_levels.len()
1082                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1083                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1084                    insert_sub_level_id,
1085                    delete_sst_ids_set,
1086                    l0.sub_levels
1087                        .iter()
1088                        .map(|level| level.sub_level_id)
1089                        .collect_vec()
1090                );
1091                if l0.sub_levels[index].table_infos.is_empty()
1092                    && member_table_ids.len() == 1
1093                    && insert_table_infos.iter().all(|sst| {
1094                        sst.table_ids.len() == 1
1095                            && sst.table_ids[0]
1096                                == member_table_ids.iter().next().expect("non-empty").table_id
1097                    })
1098                {
1099                    // Only change vnode_partition_count for group which has only one state-table.
1100                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1101                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1102                }
1103                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1104            } else {
1105                let idx = insert_sst_level_id as usize - 1;
1106                if self.levels[idx].table_infos.is_empty()
1107                    && insert_table_infos
1108                        .iter()
1109                        .all(|sst| sst.table_ids.len() == 1)
1110                {
1111                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1112                } else if self.levels[idx].vnode_partition_count != 0
1113                    && new_vnode_partition_count == 0
1114                    && member_table_ids.len() > 1
1115                {
1116                    self.levels[idx].vnode_partition_count = 0;
1117                }
1118                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1119            }
1120        }
1121    }
1122
1123    pub(crate) fn post_apply_l0_compact(&mut self) {
1124        {
1125            self.l0
1126                .sub_levels
1127                .retain(|level| !level.table_infos.is_empty());
1128            self.l0.total_file_size = self
1129                .l0
1130                .sub_levels
1131                .iter()
1132                .map(|level| level.total_file_size)
1133                .sum::<u64>();
1134            self.l0.uncompressed_file_size = self
1135                .l0
1136                .sub_levels
1137                .iter()
1138                .map(|level| level.uncompressed_file_size)
1139                .sum::<u64>();
1140        }
1141    }
1142}
1143
1144impl<T, L> HummockVersionCommon<T, L> {
1145    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1146        self.levels
1147            .values()
1148            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1149    }
1150}
1151
1152pub fn build_initial_compaction_group_levels(
1153    group_id: CompactionGroupId,
1154    compaction_config: &CompactionConfig,
1155) -> Levels {
1156    let mut levels = vec![];
1157    for l in 0..compaction_config.get_max_level() {
1158        levels.push(Level {
1159            level_idx: (l + 1) as u32,
1160            level_type: PbLevelType::Nonoverlapping,
1161            table_infos: vec![],
1162            total_file_size: 0,
1163            sub_level_id: 0,
1164            uncompressed_file_size: 0,
1165            vnode_partition_count: 0,
1166        });
1167    }
1168    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1169    Levels {
1170        levels,
1171        l0: OverlappingLevel {
1172            sub_levels: vec![],
1173            total_file_size: 0,
1174            uncompressed_file_size: 0,
1175        },
1176        group_id,
1177        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1178        member_table_ids: vec![],
1179        compaction_group_version_id: 0,
1180    }
1181}
1182
1183fn split_sst_info_for_level(
1184    member_table_ids: &BTreeSet<u32>,
1185    level: &mut Level,
1186    new_sst_id: &mut HummockSstableId,
1187) -> Vec<SstableInfo> {
1188    // Remove SST from sub level may result in empty sub level. It will be purged
1189    // whenever another compaction task is finished.
1190    let mut insert_table_infos = vec![];
1191    for sst_info in &mut level.table_infos {
1192        let removed_table_ids = sst_info
1193            .table_ids
1194            .iter()
1195            .filter(|table_id| member_table_ids.contains(table_id))
1196            .cloned()
1197            .collect_vec();
1198        let sst_size = sst_info.sst_size;
1199        if sst_size / 2 == 0 {
1200            tracing::warn!(
1201                id = %sst_info.sst_id,
1202                object_id = %sst_info.object_id,
1203                sst_size = sst_info.sst_size,
1204                file_size = sst_info.file_size,
1205                "Sstable sst_size is under expected",
1206            );
1207        };
1208        if !removed_table_ids.is_empty() {
1209            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1210                sst_info,
1211                new_sst_id,
1212                sst_size / 2,
1213                sst_size / 2,
1214                member_table_ids.iter().cloned().collect_vec(),
1215            );
1216            *sst_info = modified_sst;
1217            insert_table_infos.push(branch_sst);
1218        }
1219    }
1220    insert_table_infos
1221}
1222
1223/// Gets all compaction group ids.
1224pub fn get_compaction_group_ids(
1225    version: &HummockVersion,
1226) -> impl Iterator<Item = CompactionGroupId> + '_ {
1227    version.levels.keys().cloned()
1228}
1229
1230pub fn get_table_compaction_group_id_mapping(
1231    version: &HummockVersion,
1232) -> HashMap<StateTableId, CompactionGroupId> {
1233    version
1234        .state_table_info
1235        .info()
1236        .iter()
1237        .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1238        .collect()
1239}
1240
1241/// Gets all SSTs in `group_id`
1242pub fn get_compaction_group_ssts(
1243    version: &HummockVersion,
1244    group_id: CompactionGroupId,
1245) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1246    let group_levels = version.get_compaction_group_levels(group_id);
1247    group_levels
1248        .l0
1249        .sub_levels
1250        .iter()
1251        .rev()
1252        .chain(group_levels.levels.iter())
1253        .flat_map(|level| {
1254            level
1255                .table_infos
1256                .iter()
1257                .map(|table_info| (table_info.object_id, table_info.sst_id))
1258        })
1259}
1260
1261pub fn new_sub_level(
1262    sub_level_id: u64,
1263    level_type: PbLevelType,
1264    table_infos: Vec<SstableInfo>,
1265) -> Level {
1266    if level_type == PbLevelType::Nonoverlapping {
1267        debug_assert!(
1268            can_concat(&table_infos),
1269            "sst of non-overlapping level is not concat-able: {:?}",
1270            table_infos
1271        );
1272    }
1273    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1274    let uncompressed_file_size = table_infos
1275        .iter()
1276        .map(|table| table.uncompressed_file_size)
1277        .sum();
1278    Level {
1279        level_idx: 0,
1280        level_type,
1281        table_infos,
1282        total_file_size,
1283        sub_level_id,
1284        uncompressed_file_size,
1285        vnode_partition_count: 0,
1286    }
1287}
1288
1289pub fn add_ssts_to_sub_level(
1290    l0: &mut OverlappingLevel,
1291    sub_level_idx: usize,
1292    insert_table_infos: Vec<SstableInfo>,
1293) {
1294    insert_table_infos.iter().for_each(|sst| {
1295        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1296        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1297        l0.total_file_size += sst.sst_size;
1298        l0.uncompressed_file_size += sst.uncompressed_file_size;
1299    });
1300    l0.sub_levels[sub_level_idx]
1301        .table_infos
1302        .extend(insert_table_infos);
1303    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1304        l0.sub_levels[sub_level_idx]
1305            .table_infos
1306            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1307        assert!(
1308            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1309            "sstable ids: {:?}",
1310            l0.sub_levels[sub_level_idx]
1311                .table_infos
1312                .iter()
1313                .map(|sst| sst.sst_id)
1314                .collect_vec()
1315        );
1316    }
1317}
1318
1319/// `None` value of `sub_level_insert_hint` means append.
1320pub fn insert_new_sub_level(
1321    l0: &mut OverlappingLevel,
1322    insert_sub_level_id: u64,
1323    level_type: PbLevelType,
1324    insert_table_infos: Vec<SstableInfo>,
1325    sub_level_insert_hint: Option<usize>,
1326) {
1327    if insert_sub_level_id == u64::MAX {
1328        return;
1329    }
1330    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1331        insert_pos
1332    } else {
1333        if let Some(newest_level) = l0.sub_levels.last() {
1334            assert!(
1335                newest_level.sub_level_id < insert_sub_level_id,
1336                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1337                newest_level.sub_level_id,
1338                insert_sub_level_id,
1339                l0,
1340            );
1341        }
1342        l0.sub_levels.len()
1343    };
1344    #[cfg(debug_assertions)]
1345    {
1346        if insert_pos > 0 {
1347            if let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1) {
1348                debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1349            }
1350        }
1351        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1352            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1353        }
1354    }
1355    // All files will be committed in one new Overlapping sub-level and become
1356    // Nonoverlapping  after at least one compaction.
1357    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1358    l0.total_file_size += level.total_file_size;
1359    l0.uncompressed_file_size += level.uncompressed_file_size;
1360    l0.sub_levels.insert(insert_pos, level);
1361}
1362
1363/// Delete sstables if the table id is in the id set.
1364///
1365/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1366fn level_delete_ssts(
1367    operand: &mut Level,
1368    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1369) -> bool {
1370    let original_len = operand.table_infos.len();
1371    operand
1372        .table_infos
1373        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1374    operand.total_file_size = operand
1375        .table_infos
1376        .iter()
1377        .map(|table| table.sst_size)
1378        .sum::<u64>();
1379    operand.uncompressed_file_size = operand
1380        .table_infos
1381        .iter()
1382        .map(|table| table.uncompressed_file_size)
1383        .sum::<u64>();
1384    original_len != operand.table_infos.len()
1385}
1386
1387fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1388    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1389        format!(
1390            "sstable ids: {:?}",
1391            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1392        )
1393    }
1394    operand.total_file_size += insert_table_infos
1395        .iter()
1396        .map(|sst| sst.sst_size)
1397        .sum::<u64>();
1398    operand.uncompressed_file_size += insert_table_infos
1399        .iter()
1400        .map(|sst| sst.uncompressed_file_size)
1401        .sum::<u64>();
1402    if operand.level_type == PbLevelType::Overlapping {
1403        operand.level_type = PbLevelType::Nonoverlapping;
1404        operand
1405            .table_infos
1406            .extend(insert_table_infos.iter().cloned());
1407        operand
1408            .table_infos
1409            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1410        assert!(
1411            can_concat(&operand.table_infos),
1412            "{}",
1413            display_sstable_infos(&operand.table_infos)
1414        );
1415    } else if !insert_table_infos.is_empty() {
1416        let sorted_insert: Vec<_> = insert_table_infos
1417            .iter()
1418            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1419            .cloned()
1420            .collect();
1421        let first = &sorted_insert[0];
1422        let last = &sorted_insert[sorted_insert.len() - 1];
1423        let pos = operand
1424            .table_infos
1425            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1426        if pos >= operand.table_infos.len()
1427            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1428        {
1429            operand.table_infos.splice(pos..pos, sorted_insert);
1430            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1431            let validate_range = operand
1432                .table_infos
1433                .iter()
1434                .skip(pos.saturating_sub(1))
1435                .take(insert_table_infos.len() + 2)
1436                .collect_vec();
1437            assert!(
1438                can_concat(&validate_range),
1439                "{}",
1440                display_sstable_infos(&validate_range),
1441            );
1442        } else {
1443            // If this branch is reached, it indicates some unexpected behavior in compaction.
1444            // Here we issue a warning and fall back to insert one by one.
1445            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1446            for i in insert_table_infos {
1447                let pos = operand
1448                    .table_infos
1449                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1450                operand.table_infos.insert(pos, i.clone());
1451            }
1452            assert!(
1453                can_concat(&operand.table_infos),
1454                "{}",
1455                display_sstable_infos(&operand.table_infos)
1456            );
1457        }
1458    }
1459}
1460
1461pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1462    // DO NOT REMOVE THIS LINE
1463    // This is to ensure that when adding new variant to `HummockObjectId`,
1464    // the compiler will warn us if we forget to handle it here.
1465    match HummockObjectId::Sstable(0.into()) {
1466        HummockObjectId::Sstable(_) => {}
1467        HummockObjectId::VectorFile(_) => {}
1468    };
1469    version
1470        .levels
1471        .values()
1472        .flat_map(|cg| {
1473            cg.level0()
1474                .sub_levels
1475                .iter()
1476                .chain(cg.levels.iter())
1477                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1478        })
1479        .chain(version.table_change_log.values().flat_map(|c| {
1480            c.iter().flat_map(|l| {
1481                l.old_value
1482                    .iter()
1483                    .chain(l.new_value.iter())
1484                    .map(|t| (t.object_id, t.file_size))
1485            })
1486        }))
1487        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1488        .chain(
1489            version
1490                .vector_indexes
1491                .values()
1492                .flat_map(|index| index.get_objects()),
1493        )
1494        .collect()
1495}
1496
1497/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1498/// Currently this method is only used by risectl validate-version.
1499pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1500    let mut res = Vec::new();
1501    // Ensure each table maps to only one compaction group
1502    for (group_id, levels) in &version.levels {
1503        // Ensure compaction group id matches
1504        if levels.group_id != *group_id {
1505            res.push(format!(
1506                "GROUP {}: inconsistent group id {} in Levels",
1507                group_id, levels.group_id
1508            ));
1509        }
1510
1511        let validate_level = |group: CompactionGroupId,
1512                              expected_level_idx: u32,
1513                              level: &Level,
1514                              res: &mut Vec<String>| {
1515            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1516            if level.level_idx == 0 {
1517                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1518                // Ensure sub-level is not empty
1519                if level.table_infos.is_empty() {
1520                    res.push(format!("{}: empty level", level_identifier));
1521                }
1522            } else if level.level_type != PbLevelType::Nonoverlapping {
1523                // Ensure non-L0 level is non-overlapping level
1524                res.push(format!(
1525                    "{}: level type {:?} is not non-overlapping",
1526                    level_identifier, level.level_type
1527                ));
1528            }
1529
1530            // Ensure level idx matches
1531            if level.level_idx != expected_level_idx {
1532                res.push(format!(
1533                    "{}: mismatched level idx {}",
1534                    level_identifier, expected_level_idx
1535                ));
1536            }
1537
1538            let mut prev_table_info: Option<&SstableInfo> = None;
1539            for table_info in &level.table_infos {
1540                // Ensure table_ids are sorted and unique
1541                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1542                    res.push(format!(
1543                        "{} SST {}: table_ids not sorted",
1544                        level_identifier, table_info.object_id
1545                    ));
1546                }
1547
1548                // Ensure SSTs in non-overlapping level have non-overlapping key range
1549                if level.level_type == PbLevelType::Nonoverlapping {
1550                    if let Some(prev) = prev_table_info.take() {
1551                        if prev
1552                            .key_range
1553                            .compare_right_with(&table_info.key_range.left)
1554                            != Ordering::Less
1555                        {
1556                            res.push(format!(
1557                                "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1558                                level_identifier, table_info.object_id, prev, table_info
1559                            ));
1560                        }
1561                    }
1562                    let _ = prev_table_info.insert(table_info);
1563                }
1564            }
1565        };
1566
1567        let l0 = &levels.l0;
1568        let mut prev_sub_level_id = u64::MAX;
1569        for sub_level in &l0.sub_levels {
1570            // Ensure sub_level_id is sorted and unique
1571            if sub_level.sub_level_id >= prev_sub_level_id {
1572                res.push(format!(
1573                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1574                    group_id, sub_level.level_idx, prev_sub_level_id
1575                ));
1576            }
1577            prev_sub_level_id = sub_level.sub_level_id;
1578
1579            validate_level(*group_id, 0, sub_level, &mut res);
1580        }
1581
1582        for idx in 1..=levels.levels.len() {
1583            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1584        }
1585    }
1586    res
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use std::collections::{HashMap, HashSet};
1592
1593    use bytes::Bytes;
1594    use risingwave_common::catalog::TableId;
1595    use risingwave_common::hash::VirtualNode;
1596    use risingwave_common::util::epoch::test_epoch;
1597    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1598
1599    use super::group_split;
1600    use crate::HummockVersionId;
1601    use crate::compaction_group::group_split::*;
1602    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1603    use crate::key::{FullKey, gen_key_from_str};
1604    use crate::key_range::KeyRange;
1605    use crate::level::{Level, Levels, OverlappingLevel};
1606    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1607    use crate::version::{
1608        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1609    };
1610
1611    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1612        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1613    }
1614
1615    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1616        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1617        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1618        let full_key_l = FullKey::for_test(
1619            TableId::new(*table_ids.first().unwrap()),
1620            table_key_l,
1621            epoch,
1622        )
1623        .encode();
1624        let full_key_r =
1625            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1626                .encode();
1627
1628        SstableInfoInner {
1629            sst_id: sst_id.into(),
1630            key_range: KeyRange {
1631                left: full_key_l.into(),
1632                right: full_key_r.into(),
1633                right_exclusive: false,
1634            },
1635            table_ids,
1636            object_id: sst_id.into(),
1637            min_epoch: 20,
1638            max_epoch: 20,
1639            file_size: 100,
1640            sst_size: 100,
1641            ..Default::default()
1642        }
1643    }
1644
1645    #[test]
1646    fn test_get_sst_object_ids() {
1647        let mut version = HummockVersion {
1648            id: HummockVersionId::new(0),
1649            levels: HashMap::from_iter([(
1650                0,
1651                Levels {
1652                    levels: vec![],
1653                    l0: OverlappingLevel {
1654                        sub_levels: vec![],
1655                        total_file_size: 0,
1656                        uncompressed_file_size: 0,
1657                    },
1658                    ..Default::default()
1659                },
1660            )]),
1661            ..Default::default()
1662        };
1663        assert_eq!(version.get_object_ids(false).count(), 0);
1664
1665        // Add to sub level
1666        version
1667            .levels
1668            .get_mut(&0)
1669            .unwrap()
1670            .l0
1671            .sub_levels
1672            .push(Level {
1673                table_infos: vec![
1674                    SstableInfoInner {
1675                        object_id: 11.into(),
1676                        sst_id: 11.into(),
1677                        ..Default::default()
1678                    }
1679                    .into(),
1680                ],
1681                ..Default::default()
1682            });
1683        assert_eq!(version.get_object_ids(false).count(), 1);
1684
1685        // Add to non sub level
1686        version.levels.get_mut(&0).unwrap().levels.push(Level {
1687            table_infos: vec![
1688                SstableInfoInner {
1689                    object_id: 22.into(),
1690                    sst_id: 22.into(),
1691                    ..Default::default()
1692                }
1693                .into(),
1694            ],
1695            ..Default::default()
1696        });
1697        assert_eq!(version.get_object_ids(false).count(), 2);
1698    }
1699
1700    #[test]
1701    fn test_apply_version_delta() {
1702        let mut version = HummockVersion {
1703            id: HummockVersionId::new(0),
1704            levels: HashMap::from_iter([
1705                (
1706                    0,
1707                    build_initial_compaction_group_levels(
1708                        0,
1709                        &CompactionConfig {
1710                            max_level: 6,
1711                            ..Default::default()
1712                        },
1713                    ),
1714                ),
1715                (
1716                    1,
1717                    build_initial_compaction_group_levels(
1718                        1,
1719                        &CompactionConfig {
1720                            max_level: 6,
1721                            ..Default::default()
1722                        },
1723                    ),
1724                ),
1725            ]),
1726            ..Default::default()
1727        };
1728        let version_delta = HummockVersionDelta {
1729            id: HummockVersionId::new(1),
1730            group_deltas: HashMap::from_iter([
1731                (
1732                    2,
1733                    GroupDeltas {
1734                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1735                            group_config: Some(CompactionConfig {
1736                                max_level: 6,
1737                                ..Default::default()
1738                            }),
1739                            ..Default::default()
1740                        }))],
1741                    },
1742                ),
1743                (
1744                    0,
1745                    GroupDeltas {
1746                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1747                    },
1748                ),
1749                (
1750                    1,
1751                    GroupDeltas {
1752                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1753                            1,
1754                            0,
1755                            HashSet::new(),
1756                            vec![
1757                                SstableInfoInner {
1758                                    object_id: 1.into(),
1759                                    sst_id: 1.into(),
1760                                    ..Default::default()
1761                                }
1762                                .into(),
1763                            ],
1764                            0,
1765                            version
1766                                .levels
1767                                .get(&1)
1768                                .as_ref()
1769                                .unwrap()
1770                                .compaction_group_version_id,
1771                        ))],
1772                    },
1773                ),
1774            ]),
1775            ..Default::default()
1776        };
1777        let version_delta = version_delta;
1778
1779        version.apply_version_delta(&version_delta);
1780        let mut cg1 = build_initial_compaction_group_levels(
1781            1,
1782            &CompactionConfig {
1783                max_level: 6,
1784                ..Default::default()
1785            },
1786        );
1787        cg1.levels[0] = Level {
1788            level_idx: 1,
1789            level_type: LevelType::Nonoverlapping,
1790            table_infos: vec![
1791                SstableInfoInner {
1792                    object_id: 1.into(),
1793                    sst_id: 1.into(),
1794                    ..Default::default()
1795                }
1796                .into(),
1797            ],
1798            ..Default::default()
1799        };
1800        assert_eq!(
1801            version,
1802            HummockVersion {
1803                id: HummockVersionId::new(1),
1804                levels: HashMap::from_iter([
1805                    (
1806                        2,
1807                        build_initial_compaction_group_levels(
1808                            2,
1809                            &CompactionConfig {
1810                                max_level: 6,
1811                                ..Default::default()
1812                            },
1813                        ),
1814                    ),
1815                    (1, cg1),
1816                ]),
1817                ..Default::default()
1818            }
1819        );
1820    }
1821
1822    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1823        gen_sst_info_impl(object_id, table_ids, left, right).into()
1824    }
1825
1826    fn gen_sst_info_impl(
1827        object_id: u64,
1828        table_ids: Vec<u32>,
1829        left: Bytes,
1830        right: Bytes,
1831    ) -> SstableInfoInner {
1832        SstableInfoInner {
1833            object_id: object_id.into(),
1834            sst_id: object_id.into(),
1835            key_range: KeyRange {
1836                left,
1837                right,
1838                right_exclusive: false,
1839            },
1840            table_ids,
1841            file_size: 100,
1842            sst_size: 100,
1843            uncompressed_file_size: 100,
1844            ..Default::default()
1845        }
1846    }
1847
1848    #[test]
1849    fn test_merge_levels() {
1850        let mut left_levels = build_initial_compaction_group_levels(
1851            1,
1852            &CompactionConfig {
1853                max_level: 6,
1854                ..Default::default()
1855            },
1856        );
1857
1858        let mut right_levels = build_initial_compaction_group_levels(
1859            2,
1860            &CompactionConfig {
1861                max_level: 6,
1862                ..Default::default()
1863            },
1864        );
1865
1866        left_levels.levels[0] = Level {
1867            level_idx: 1,
1868            level_type: LevelType::Nonoverlapping,
1869            table_infos: vec![
1870                gen_sst_info(
1871                    1,
1872                    vec![3],
1873                    FullKey::for_test(
1874                        TableId::new(3),
1875                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1876                        0,
1877                    )
1878                    .encode()
1879                    .into(),
1880                    FullKey::for_test(
1881                        TableId::new(3),
1882                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1883                        0,
1884                    )
1885                    .encode()
1886                    .into(),
1887                ),
1888                gen_sst_info(
1889                    10,
1890                    vec![3, 4],
1891                    FullKey::for_test(
1892                        TableId::new(3),
1893                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1894                        0,
1895                    )
1896                    .encode()
1897                    .into(),
1898                    FullKey::for_test(
1899                        TableId::new(4),
1900                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1901                        0,
1902                    )
1903                    .encode()
1904                    .into(),
1905                ),
1906                gen_sst_info(
1907                    11,
1908                    vec![4],
1909                    FullKey::for_test(
1910                        TableId::new(4),
1911                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1912                        0,
1913                    )
1914                    .encode()
1915                    .into(),
1916                    FullKey::for_test(
1917                        TableId::new(4),
1918                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1919                        0,
1920                    )
1921                    .encode()
1922                    .into(),
1923                ),
1924            ],
1925            total_file_size: 300,
1926            ..Default::default()
1927        };
1928
1929        left_levels.l0.sub_levels.push(Level {
1930            level_idx: 0,
1931            table_infos: vec![gen_sst_info(
1932                3,
1933                vec![3],
1934                FullKey::for_test(
1935                    TableId::new(3),
1936                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1937                    0,
1938                )
1939                .encode()
1940                .into(),
1941                FullKey::for_test(
1942                    TableId::new(3),
1943                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1944                    0,
1945                )
1946                .encode()
1947                .into(),
1948            )],
1949            sub_level_id: 101,
1950            level_type: LevelType::Overlapping,
1951            total_file_size: 100,
1952            ..Default::default()
1953        });
1954
1955        left_levels.l0.sub_levels.push(Level {
1956            level_idx: 0,
1957            table_infos: vec![gen_sst_info(
1958                3,
1959                vec![3],
1960                FullKey::for_test(
1961                    TableId::new(3),
1962                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1963                    0,
1964                )
1965                .encode()
1966                .into(),
1967                FullKey::for_test(
1968                    TableId::new(3),
1969                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1970                    0,
1971                )
1972                .encode()
1973                .into(),
1974            )],
1975            sub_level_id: 103,
1976            level_type: LevelType::Overlapping,
1977            total_file_size: 100,
1978            ..Default::default()
1979        });
1980
1981        left_levels.l0.sub_levels.push(Level {
1982            level_idx: 0,
1983            table_infos: vec![gen_sst_info(
1984                3,
1985                vec![3],
1986                FullKey::for_test(
1987                    TableId::new(3),
1988                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1989                    0,
1990                )
1991                .encode()
1992                .into(),
1993                FullKey::for_test(
1994                    TableId::new(3),
1995                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1996                    0,
1997                )
1998                .encode()
1999                .into(),
2000            )],
2001            sub_level_id: 105,
2002            level_type: LevelType::Nonoverlapping,
2003            total_file_size: 100,
2004            ..Default::default()
2005        });
2006
2007        right_levels.levels[0] = Level {
2008            level_idx: 1,
2009            level_type: LevelType::Nonoverlapping,
2010            table_infos: vec![
2011                gen_sst_info(
2012                    1,
2013                    vec![5],
2014                    FullKey::for_test(
2015                        TableId::new(5),
2016                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2017                        0,
2018                    )
2019                    .encode()
2020                    .into(),
2021                    FullKey::for_test(
2022                        TableId::new(5),
2023                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2024                        0,
2025                    )
2026                    .encode()
2027                    .into(),
2028                ),
2029                gen_sst_info(
2030                    10,
2031                    vec![5, 6],
2032                    FullKey::for_test(
2033                        TableId::new(5),
2034                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2035                        0,
2036                    )
2037                    .encode()
2038                    .into(),
2039                    FullKey::for_test(
2040                        TableId::new(6),
2041                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2042                        0,
2043                    )
2044                    .encode()
2045                    .into(),
2046                ),
2047                gen_sst_info(
2048                    11,
2049                    vec![6],
2050                    FullKey::for_test(
2051                        TableId::new(6),
2052                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2053                        0,
2054                    )
2055                    .encode()
2056                    .into(),
2057                    FullKey::for_test(
2058                        TableId::new(6),
2059                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2060                        0,
2061                    )
2062                    .encode()
2063                    .into(),
2064                ),
2065            ],
2066            total_file_size: 300,
2067            ..Default::default()
2068        };
2069
2070        right_levels.l0.sub_levels.push(Level {
2071            level_idx: 0,
2072            table_infos: vec![gen_sst_info(
2073                3,
2074                vec![5],
2075                FullKey::for_test(
2076                    TableId::new(5),
2077                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2078                    0,
2079                )
2080                .encode()
2081                .into(),
2082                FullKey::for_test(
2083                    TableId::new(5),
2084                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2085                    0,
2086                )
2087                .encode()
2088                .into(),
2089            )],
2090            sub_level_id: 101,
2091            level_type: LevelType::Overlapping,
2092            total_file_size: 100,
2093            ..Default::default()
2094        });
2095
2096        right_levels.l0.sub_levels.push(Level {
2097            level_idx: 0,
2098            table_infos: vec![gen_sst_info(
2099                5,
2100                vec![5],
2101                FullKey::for_test(
2102                    TableId::new(5),
2103                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2104                    0,
2105                )
2106                .encode()
2107                .into(),
2108                FullKey::for_test(
2109                    TableId::new(5),
2110                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2111                    0,
2112                )
2113                .encode()
2114                .into(),
2115            )],
2116            sub_level_id: 102,
2117            level_type: LevelType::Overlapping,
2118            total_file_size: 100,
2119            ..Default::default()
2120        });
2121
2122        right_levels.l0.sub_levels.push(Level {
2123            level_idx: 0,
2124            table_infos: vec![gen_sst_info(
2125                3,
2126                vec![5],
2127                FullKey::for_test(
2128                    TableId::new(5),
2129                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2130                    0,
2131                )
2132                .encode()
2133                .into(),
2134                FullKey::for_test(
2135                    TableId::new(5),
2136                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2137                    0,
2138                )
2139                .encode()
2140                .into(),
2141            )],
2142            sub_level_id: 103,
2143            level_type: LevelType::Nonoverlapping,
2144            total_file_size: 100,
2145            ..Default::default()
2146        });
2147
2148        {
2149            // test empty
2150            let mut left_levels = Levels::default();
2151            let right_levels = Levels::default();
2152
2153            group_split::merge_levels(&mut left_levels, right_levels);
2154        }
2155
2156        {
2157            // test empty left
2158            let mut left_levels = build_initial_compaction_group_levels(
2159                1,
2160                &CompactionConfig {
2161                    max_level: 6,
2162                    ..Default::default()
2163                },
2164            );
2165            let right_levels = right_levels.clone();
2166
2167            group_split::merge_levels(&mut left_levels, right_levels);
2168
2169            assert!(left_levels.l0.sub_levels.len() == 3);
2170            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2171            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2172            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2173            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2174            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2175            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2176
2177            assert!(left_levels.levels[0].level_idx == 1);
2178            assert_eq!(300, left_levels.levels[0].total_file_size);
2179        }
2180
2181        {
2182            // test empty right
2183            let mut left_levels = left_levels.clone();
2184            let right_levels = build_initial_compaction_group_levels(
2185                2,
2186                &CompactionConfig {
2187                    max_level: 6,
2188                    ..Default::default()
2189                },
2190            );
2191
2192            group_split::merge_levels(&mut left_levels, right_levels);
2193
2194            assert!(left_levels.l0.sub_levels.len() == 3);
2195            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2196            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2197            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2198            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2199            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2200            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2201
2202            assert!(left_levels.levels[0].level_idx == 1);
2203            assert_eq!(300, left_levels.levels[0].total_file_size);
2204        }
2205
2206        {
2207            let mut left_levels = left_levels.clone();
2208            let right_levels = right_levels.clone();
2209
2210            group_split::merge_levels(&mut left_levels, right_levels);
2211
2212            assert!(left_levels.l0.sub_levels.len() == 6);
2213            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2214            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2215            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2216            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2217            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2218            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2219            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2220            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2221            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2222            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2223            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2224            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2225
2226            assert!(left_levels.levels[0].level_idx == 1);
2227            assert_eq!(600, left_levels.levels[0].total_file_size);
2228        }
2229    }
2230
2231    #[test]
2232    fn test_get_split_pos() {
2233        let epoch = test_epoch(1);
2234        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2235        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2236        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2237
2238        let ssts = vec![s1, s2, s3];
2239        let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2240
2241        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2242        assert_eq!(1, pos);
2243
2244        let pos = group_split::get_split_pos(&vec![], split_key);
2245        assert_eq!(0, pos);
2246    }
2247
2248    #[test]
2249    fn test_split_sst() {
2250        let epoch = test_epoch(1);
2251        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2252
2253        {
2254            let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2255            let origin_sst = sst.clone();
2256            let sst_size = origin_sst.sst_size;
2257            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2258            assert_eq!(SstSplitType::Both, split_type);
2259
2260            let mut new_sst_id = 10.into();
2261            let (origin_sst, branched_sst) = group_split::split_sst(
2262                origin_sst,
2263                &mut new_sst_id,
2264                split_key,
2265                sst_size / 2,
2266                sst_size / 2,
2267            );
2268
2269            let origin_sst = origin_sst.unwrap();
2270            let branched_sst = branched_sst.unwrap();
2271
2272            assert!(origin_sst.key_range.right_exclusive);
2273            assert!(
2274                origin_sst
2275                    .key_range
2276                    .right
2277                    .cmp(&branched_sst.key_range.left)
2278                    .is_le()
2279            );
2280            assert!(origin_sst.table_ids.is_sorted());
2281            assert!(branched_sst.table_ids.is_sorted());
2282            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2283            assert!(branched_sst.sst_size < origin_sst.file_size);
2284            assert_eq!(10, branched_sst.sst_id);
2285            assert_eq!(11, origin_sst.sst_id);
2286            assert_eq!(&3, branched_sst.table_ids.first().unwrap()); // split table_id to right
2287        }
2288
2289        {
2290            // test un-exist table_id
2291            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2292            let origin_sst = sst.clone();
2293            let sst_size = origin_sst.sst_size;
2294            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2295            assert_eq!(SstSplitType::Both, split_type);
2296
2297            let mut new_sst_id = 10.into();
2298            let (origin_sst, branched_sst) = group_split::split_sst(
2299                origin_sst,
2300                &mut new_sst_id,
2301                split_key,
2302                sst_size / 2,
2303                sst_size / 2,
2304            );
2305
2306            let origin_sst = origin_sst.unwrap();
2307            let branched_sst = branched_sst.unwrap();
2308
2309            assert!(origin_sst.key_range.right_exclusive);
2310            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2311            assert!(origin_sst.table_ids.is_sorted());
2312            assert!(branched_sst.table_ids.is_sorted());
2313            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2314            assert!(branched_sst.sst_size < origin_sst.file_size);
2315            assert_eq!(10, branched_sst.sst_id);
2316            assert_eq!(11, origin_sst.sst_id);
2317            assert_eq!(&5, branched_sst.table_ids.first().unwrap()); // split table_id to right
2318        }
2319
2320        {
2321            let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2322            let origin_sst = sst.clone();
2323            let split_type = group_split::need_to_split(&origin_sst, split_key);
2324            assert_eq!(SstSplitType::Left, split_type);
2325        }
2326
2327        {
2328            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2329            let origin_sst = sst.clone();
2330            let split_type = group_split::need_to_split(&origin_sst, split_key);
2331            assert_eq!(SstSplitType::Both, split_type);
2332
2333            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2334            let origin_sst = sst.clone();
2335            let split_type = group_split::need_to_split(&origin_sst, split_key);
2336            assert_eq!(SstSplitType::Right, split_type);
2337        }
2338
2339        {
2340            // test key_range left = right
2341            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2342            sst.key_range.right = sst.key_range.left.clone();
2343            let sst: SstableInfo = sst.into();
2344            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2345            let origin_sst = sst.clone();
2346            let sst_size = origin_sst.sst_size;
2347
2348            let mut new_sst_id = 10.into();
2349            let (origin_sst, branched_sst) = group_split::split_sst(
2350                origin_sst,
2351                &mut new_sst_id,
2352                split_key,
2353                sst_size / 2,
2354                sst_size / 2,
2355            );
2356
2357            assert!(origin_sst.is_none());
2358            assert!(branched_sst.is_some());
2359        }
2360    }
2361
2362    #[test]
2363    fn test_split_sst_info_for_level() {
2364        let mut version = HummockVersion {
2365            id: HummockVersionId(0),
2366            levels: HashMap::from_iter([(
2367                1,
2368                build_initial_compaction_group_levels(
2369                    1,
2370                    &CompactionConfig {
2371                        max_level: 6,
2372                        ..Default::default()
2373                    },
2374                ),
2375            )]),
2376            ..Default::default()
2377        };
2378
2379        let cg1 = version.levels.get_mut(&1).unwrap();
2380
2381        cg1.levels[0] = Level {
2382            level_idx: 1,
2383            level_type: LevelType::Nonoverlapping,
2384            table_infos: vec![
2385                gen_sst_info(
2386                    1,
2387                    vec![3],
2388                    FullKey::for_test(
2389                        TableId::new(3),
2390                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2391                        0,
2392                    )
2393                    .encode()
2394                    .into(),
2395                    FullKey::for_test(
2396                        TableId::new(3),
2397                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2398                        0,
2399                    )
2400                    .encode()
2401                    .into(),
2402                ),
2403                gen_sst_info(
2404                    10,
2405                    vec![3, 4],
2406                    FullKey::for_test(
2407                        TableId::new(3),
2408                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2409                        0,
2410                    )
2411                    .encode()
2412                    .into(),
2413                    FullKey::for_test(
2414                        TableId::new(4),
2415                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2416                        0,
2417                    )
2418                    .encode()
2419                    .into(),
2420                ),
2421                gen_sst_info(
2422                    11,
2423                    vec![4],
2424                    FullKey::for_test(
2425                        TableId::new(4),
2426                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2427                        0,
2428                    )
2429                    .encode()
2430                    .into(),
2431                    FullKey::for_test(
2432                        TableId::new(4),
2433                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2434                        0,
2435                    )
2436                    .encode()
2437                    .into(),
2438                ),
2439            ],
2440            total_file_size: 300,
2441            ..Default::default()
2442        };
2443
2444        cg1.l0.sub_levels.push(Level {
2445            level_idx: 0,
2446            table_infos: vec![
2447                gen_sst_info(
2448                    2,
2449                    vec![2],
2450                    FullKey::for_test(
2451                        TableId::new(0),
2452                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2453                        0,
2454                    )
2455                    .encode()
2456                    .into(),
2457                    FullKey::for_test(
2458                        TableId::new(2),
2459                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2460                        0,
2461                    )
2462                    .encode()
2463                    .into(),
2464                ),
2465                gen_sst_info(
2466                    22,
2467                    vec![2],
2468                    FullKey::for_test(
2469                        TableId::new(0),
2470                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2471                        0,
2472                    )
2473                    .encode()
2474                    .into(),
2475                    FullKey::for_test(
2476                        TableId::new(2),
2477                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2478                        0,
2479                    )
2480                    .encode()
2481                    .into(),
2482                ),
2483                gen_sst_info(
2484                    23,
2485                    vec![2],
2486                    FullKey::for_test(
2487                        TableId::new(0),
2488                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2489                        0,
2490                    )
2491                    .encode()
2492                    .into(),
2493                    FullKey::for_test(
2494                        TableId::new(2),
2495                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2496                        0,
2497                    )
2498                    .encode()
2499                    .into(),
2500                ),
2501                gen_sst_info(
2502                    24,
2503                    vec![2],
2504                    FullKey::for_test(
2505                        TableId::new(2),
2506                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2507                        0,
2508                    )
2509                    .encode()
2510                    .into(),
2511                    FullKey::for_test(
2512                        TableId::new(2),
2513                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2514                        0,
2515                    )
2516                    .encode()
2517                    .into(),
2518                ),
2519                gen_sst_info(
2520                    25,
2521                    vec![2],
2522                    FullKey::for_test(
2523                        TableId::new(0),
2524                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2525                        0,
2526                    )
2527                    .encode()
2528                    .into(),
2529                    FullKey::for_test(
2530                        TableId::new(0),
2531                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2532                        0,
2533                    )
2534                    .encode()
2535                    .into(),
2536                ),
2537            ],
2538            sub_level_id: 101,
2539            level_type: LevelType::Overlapping,
2540            total_file_size: 300,
2541            ..Default::default()
2542        });
2543
2544        {
2545            // split Overlapping level
2546            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2547
2548            let mut new_sst_id = 100.into();
2549            let x = group_split::split_sst_info_for_level_v2(
2550                &mut cg1.l0.sub_levels[0],
2551                &mut new_sst_id,
2552                split_key,
2553            );
2554            // assert_eq!(3, x.len());
2555            // assert_eq!(100, x[0].sst_id);
2556            // assert_eq!(100, x[0].sst_size);
2557            // assert_eq!(101, x[1].sst_id);
2558            // assert_eq!(100, x[1].sst_size);
2559            // assert_eq!(102, x[2].sst_id);
2560            // assert_eq!(100, x[2].sst_size);
2561
2562            let mut right_l0 = OverlappingLevel {
2563                sub_levels: vec![],
2564                total_file_size: 0,
2565                uncompressed_file_size: 0,
2566            };
2567
2568            right_l0.sub_levels.push(Level {
2569                level_idx: 0,
2570                table_infos: x,
2571                sub_level_id: 101,
2572                total_file_size: 100,
2573                level_type: LevelType::Overlapping,
2574                ..Default::default()
2575            });
2576
2577            let right_levels = Levels {
2578                levels: vec![],
2579                l0: right_l0,
2580                ..Default::default()
2581            };
2582
2583            merge_levels(cg1, right_levels);
2584        }
2585
2586        {
2587            // test split empty level
2588            let mut new_sst_id = 100.into();
2589            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2590            let x = group_split::split_sst_info_for_level_v2(
2591                &mut cg1.levels[2],
2592                &mut new_sst_id,
2593                split_key,
2594            );
2595
2596            assert!(x.is_empty());
2597        }
2598
2599        {
2600            // test split to right Nonoverlapping level
2601            let mut cg1 = cg1.clone();
2602            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2603
2604            let mut new_sst_id = 100.into();
2605            let x = group_split::split_sst_info_for_level_v2(
2606                &mut cg1.levels[0],
2607                &mut new_sst_id,
2608                split_key,
2609            );
2610
2611            assert_eq!(3, x.len());
2612            assert_eq!(1, x[0].sst_id);
2613            assert_eq!(100, x[0].sst_size);
2614            assert_eq!(10, x[1].sst_id);
2615            assert_eq!(100, x[1].sst_size);
2616            assert_eq!(11, x[2].sst_id);
2617            assert_eq!(100, x[2].sst_size);
2618
2619            assert_eq!(0, cg1.levels[0].table_infos.len());
2620        }
2621
2622        {
2623            // test split to left Nonoverlapping level
2624            let mut cg1 = cg1.clone();
2625            let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2626
2627            let mut new_sst_id = 100.into();
2628            let x = group_split::split_sst_info_for_level_v2(
2629                &mut cg1.levels[0],
2630                &mut new_sst_id,
2631                split_key,
2632            );
2633
2634            assert_eq!(0, x.len());
2635            assert_eq!(3, cg1.levels[0].table_infos.len());
2636        }
2637
2638        // {
2639        //     // test split to both Nonoverlapping level
2640        //     let mut cg1 = cg1.clone();
2641        //     let split_key = build_split_key(3, VirtualNode::MAX);
2642
2643        //     let mut new_sst_id = 100;
2644        //     let x = group_split::split_sst_info_for_level_v2(
2645        //         &mut cg1.levels[0],
2646        //         &mut new_sst_id,
2647        //         split_key,
2648        //     );
2649
2650        //     assert_eq!(2, x.len());
2651        //     assert_eq!(100, x[0].sst_id);
2652        //     assert_eq!(100 / 2, x[0].sst_size);
2653        //     assert_eq!(11, x[1].sst_id);
2654        //     assert_eq!(100, x[1].sst_size);
2655        //     assert_eq!(vec![3, 4], x[0].table_ids);
2656
2657        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2658        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2659        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2660        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2661        // }
2662
2663        {
2664            // test split to both Nonoverlapping level
2665            let mut cg1 = cg1.clone();
2666            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2667
2668            let mut new_sst_id = 100.into();
2669            let x = group_split::split_sst_info_for_level_v2(
2670                &mut cg1.levels[0],
2671                &mut new_sst_id,
2672                split_key,
2673            );
2674
2675            assert_eq!(2, x.len());
2676            assert_eq!(100, x[0].sst_id);
2677            assert_eq!(100 / 2, x[0].sst_size);
2678            assert_eq!(11, x[1].sst_id);
2679            assert_eq!(100, x[1].sst_size);
2680            assert_eq!(vec![4], x[1].table_ids);
2681
2682            assert_eq!(2, cg1.levels[0].table_infos.len());
2683            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2684            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2685            assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2686        }
2687    }
2688}