risingwave_hummock_sdk/compaction_group/
hummock_version_ext.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_pb::hummock::{
27    CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
28};
29use tracing::warn;
30
31use super::group_split::split_sst_with_table_ids;
32use super::{StateTableId, group_split};
33use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
34use crate::compact_task::is_compaction_task_expired;
35use crate::compaction_group::StaticCompactionGroupId;
36use crate::key_range::KeyRangeCommon;
37use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
38use crate::sstable_info::SstableInfo;
39use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
40use crate::vector_index::apply_vector_index_delta;
41use crate::version::{
42    GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
43    IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
44};
45use crate::{
46    CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
47};
48
49#[derive(Debug, Clone, Default)]
50pub struct SstDeltaInfo {
51    pub insert_sst_level: u32,
52    pub insert_sst_infos: Vec<SstableInfo>,
53    pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
54}
55
56pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
57
58impl<L> HummockVersionCommon<SstableInfo, L> {
59    pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
60        self.levels
61            .get(&compaction_group_id)
62            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
63    }
64
65    pub fn get_compaction_group_levels_mut(
66        &mut self,
67        compaction_group_id: CompactionGroupId,
68    ) -> &mut Levels {
69        self.levels
70            .get_mut(&compaction_group_id)
71            .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
72    }
73
74    // only scan the sst infos from levels in the specified compaction group (without table change log)
75    pub fn get_sst_ids_by_group_id(
76        &self,
77        compaction_group_id: CompactionGroupId,
78    ) -> impl Iterator<Item = HummockSstableId> + '_ {
79        self.levels
80            .iter()
81            .filter_map(move |(cg_id, level)| {
82                if *cg_id == compaction_group_id {
83                    Some(level)
84                } else {
85                    None
86                }
87            })
88            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
89            .flat_map(|level| level.table_infos.iter())
90            .map(|s| s.sst_id)
91    }
92
93    pub fn level_iter<F: FnMut(&Level) -> bool>(
94        &self,
95        compaction_group_id: CompactionGroupId,
96        mut f: F,
97    ) {
98        if let Some(levels) = self.levels.get(&compaction_group_id) {
99            for sub_level in &levels.l0.sub_levels {
100                if !f(sub_level) {
101                    return;
102                }
103            }
104            for level in &levels.levels {
105                if !f(level) {
106                    return;
107                }
108            }
109        }
110    }
111
112    pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
113        // l0 is currently separated from all levels
114        self.levels
115            .get(&compaction_group_id)
116            .map(|group| group.levels.len() + 1)
117            .unwrap_or(0)
118    }
119
120    pub fn safe_epoch_table_watermarks(
121        &self,
122        existing_table_ids: &[u32],
123    ) -> BTreeMap<u32, TableWatermarks> {
124        safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
125    }
126}
127
128pub fn safe_epoch_table_watermarks_impl(
129    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
130    existing_table_ids: &[u32],
131) -> BTreeMap<u32, TableWatermarks> {
132    fn extract_single_table_watermark(
133        table_watermarks: &TableWatermarks,
134    ) -> Option<TableWatermarks> {
135        if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
136            Some(TableWatermarks {
137                watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
138                direction: table_watermarks.direction,
139                watermark_type: table_watermarks.watermark_type,
140            })
141        } else {
142            None
143        }
144    }
145    table_watermarks
146        .iter()
147        .filter_map(|(table_id, table_watermarks)| {
148            let u32_table_id = table_id.table_id();
149            if !existing_table_ids.contains(&u32_table_id) {
150                None
151            } else {
152                extract_single_table_watermark(table_watermarks)
153                    .map(|table_watermarks| (table_id.table_id, table_watermarks))
154            }
155        })
156        .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160    safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162    safe_epoch_watermarks
163        .into_iter()
164        .map(|(table_id, watermarks)| {
165            assert_eq!(watermarks.watermarks.len(), 1);
166            let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167            let mut vnode_watermark_map = BTreeMap::new();
168            for vnode_watermark in vnode_watermarks.iter() {
169                let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171                    assert!(
172                        vnode_watermark_map
173                            .insert(vnode, watermark.clone())
174                            .is_none(),
175                        "duplicate table watermark on vnode {}",
176                        vnode.to_index()
177                    );
178                }
179            }
180            (
181                TableId::from(table_id),
182                ReadTableWatermark {
183                    direction: watermarks.direction,
184                    vnode_watermarks: vnode_watermark_map,
185                },
186            )
187        })
188        .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192    pub fn count_new_ssts_in_group_split(
193        &self,
194        parent_group_id: CompactionGroupId,
195        split_key: Bytes,
196    ) -> u64 {
197        self.levels
198            .get(&parent_group_id)
199            .map_or(0, |parent_levels| {
200                let l0 = &parent_levels.l0;
201                let mut split_count = 0;
202                for sub_level in &l0.sub_levels {
203                    assert!(!sub_level.table_infos.is_empty());
204
205                    if sub_level.level_type == PbLevelType::Overlapping {
206                        // TODO: use table_id / vnode / key_range filter
207                        split_count += sub_level
208                            .table_infos
209                            .iter()
210                            .map(|sst| {
211                                if let group_split::SstSplitType::Both =
212                                    group_split::need_to_split(sst, split_key.clone())
213                                {
214                                    2
215                                } else {
216                                    0
217                                }
218                            })
219                            .sum::<u64>();
220                        continue;
221                    }
222
223                    let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224                    let sst = sub_level.table_infos.get(pos).unwrap();
225
226                    if let group_split::SstSplitType::Both =
227                        group_split::need_to_split(sst, split_key.clone())
228                    {
229                        split_count += 2;
230                    }
231                }
232
233                for level in &parent_levels.levels {
234                    if level.table_infos.is_empty() {
235                        continue;
236                    }
237                    let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238                    let sst = level.table_infos.get(pos).unwrap();
239                    if let group_split::SstSplitType::Both =
240                        group_split::need_to_split(sst, split_key.clone())
241                    {
242                        split_count += 2;
243                    }
244                }
245
246                split_count
247            })
248    }
249
250    pub fn init_with_parent_group(
251        &mut self,
252        parent_group_id: CompactionGroupId,
253        group_id: CompactionGroupId,
254        member_table_ids: BTreeSet<StateTableId>,
255        new_sst_start_id: HummockSstableId,
256    ) {
257        let mut new_sst_id = new_sst_start_id;
258        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
259            if new_sst_start_id != 0 {
260                if cfg!(debug_assertions) {
261                    panic!(
262                        "non-zero sst start id {} for NewCompactionGroup",
263                        new_sst_start_id
264                    );
265                } else {
266                    warn!(
267                        %new_sst_start_id,
268                        "non-zero sst start id for NewCompactionGroup"
269                    );
270                }
271            }
272            return;
273        } else if !self.levels.contains_key(&parent_group_id) {
274            unreachable!(
275                "non-existing parent group id {} to init from",
276                parent_group_id
277            );
278        }
279        let [parent_levels, cur_levels] = self
280            .levels
281            .get_disjoint_mut([&parent_group_id, &group_id])
282            .map(|res| res.unwrap());
283        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
284        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
285        parent_levels.compaction_group_version_id += 1;
286        cur_levels.compaction_group_version_id += 1;
287        let l0 = &mut parent_levels.l0;
288        {
289            for sub_level in &mut l0.sub_levels {
290                let target_l0 = &mut cur_levels.l0;
291                // Remove SST from sub level may result in empty sub level. It will be purged
292                // whenever another compaction task is finished.
293                let insert_table_infos =
294                    split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295                sub_level
296                    .table_infos
297                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
298                    .for_each(|sst_info| {
299                        sub_level.total_file_size -= sst_info.sst_size;
300                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
301                        l0.total_file_size -= sst_info.sst_size;
302                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
303                    });
304                if insert_table_infos.is_empty() {
305                    continue;
306                }
307                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
308                    Ok(idx) => {
309                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
310                    }
311                    Err(idx) => {
312                        insert_new_sub_level(
313                            target_l0,
314                            sub_level.sub_level_id,
315                            sub_level.level_type,
316                            insert_table_infos,
317                            Some(idx),
318                        );
319                    }
320                }
321            }
322
323            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
324        }
325        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
326            let insert_table_infos =
327                split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
328            cur_levels.levels[idx].total_file_size += insert_table_infos
329                .iter()
330                .map(|sst| sst.sst_size)
331                .sum::<u64>();
332            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
333                .iter()
334                .map(|sst| sst.uncompressed_file_size)
335                .sum::<u64>();
336            cur_levels.levels[idx]
337                .table_infos
338                .extend(insert_table_infos);
339            cur_levels.levels[idx]
340                .table_infos
341                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
342            assert!(can_concat(&cur_levels.levels[idx].table_infos));
343            level
344                .table_infos
345                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
346                .for_each(|sst_info| {
347                    level.total_file_size -= sst_info.sst_size;
348                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
349                });
350        }
351
352        assert!(
353            parent_levels
354                .l0
355                .sub_levels
356                .iter()
357                .all(|level| !level.table_infos.is_empty())
358        );
359        assert!(
360            cur_levels
361                .l0
362                .sub_levels
363                .iter()
364                .all(|level| !level.table_infos.is_empty())
365        );
366    }
367
368    pub fn build_sst_delta_infos(
369        &self,
370        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
371    ) -> Vec<SstDeltaInfo> {
372        let mut infos = vec![];
373
374        // Skip trivial move delta for refiller
375        // The trivial move task only changes the position of the sst in the lsm, it does not modify the object information corresponding to the sst, and does not need to re-execute the refill.
376        if version_delta.trivial_move {
377            return infos;
378        }
379
380        for (group_id, group_deltas) in &version_delta.group_deltas {
381            let mut info = SstDeltaInfo::default();
382
383            let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
384            let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
385
386            // Build only if all deltas are intra level deltas.
387            if !group_deltas.group_deltas.iter().all(|delta| {
388                matches!(
389                    delta,
390                    GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
391                )
392            }) {
393                continue;
394            }
395
396            // TODO(MrCroxx): At most one insert delta is allowed here. It's okay for now with the
397            // current `hummock::manager::gen_version_delta` implementation. Better refactor the
398            // struct to reduce conventions.
399            for group_delta in &group_deltas.group_deltas {
400                match group_delta {
401                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
402                        if !inserted_table_infos.is_empty() {
403                            info.insert_sst_level = 0;
404                            info.insert_sst_infos
405                                .extend(inserted_table_infos.iter().cloned());
406                        }
407                    }
408                    GroupDeltaCommon::IntraLevel(intra_level) => {
409                        if !intra_level.inserted_table_infos.is_empty() {
410                            info.insert_sst_level = intra_level.level_idx;
411                            info.insert_sst_infos
412                                .extend(intra_level.inserted_table_infos.iter().cloned());
413                        }
414                        if !intra_level.removed_table_ids.is_empty() {
415                            for id in &intra_level.removed_table_ids {
416                                if intra_level.level_idx == 0 {
417                                    removed_l0_ssts.insert(*id);
418                                } else {
419                                    removed_ssts
420                                        .entry(intra_level.level_idx)
421                                        .or_default()
422                                        .insert(*id);
423                                }
424                            }
425                        }
426                    }
427                    GroupDeltaCommon::GroupConstruct(_)
428                    | GroupDeltaCommon::GroupDestroy(_)
429                    | GroupDeltaCommon::GroupMerge(_) => {}
430                }
431            }
432
433            let group = self.levels.get(group_id).unwrap();
434            for l0_sub_level in &group.level0().sub_levels {
435                for sst_info in &l0_sub_level.table_infos {
436                    if removed_l0_ssts.remove(&sst_info.sst_id) {
437                        info.delete_sst_object_ids.push(sst_info.object_id);
438                    }
439                }
440            }
441            for level in &group.levels {
442                if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
443                    for sst_info in &level.table_infos {
444                        if removed_level_ssts.remove(&sst_info.sst_id) {
445                            info.delete_sst_object_ids.push(sst_info.object_id);
446                        }
447                    }
448                    if !removed_level_ssts.is_empty() {
449                        tracing::error!(
450                            "removed_level_ssts is not empty: {:?}",
451                            removed_level_ssts,
452                        );
453                    }
454                    debug_assert!(removed_level_ssts.is_empty());
455                }
456            }
457
458            if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
459                tracing::error!(
460                    "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
461                    removed_l0_ssts,
462                    removed_ssts
463                );
464            }
465            debug_assert!(removed_l0_ssts.is_empty());
466            debug_assert!(removed_ssts.is_empty());
467
468            infos.push(info);
469        }
470
471        infos
472    }
473
474    pub fn apply_version_delta(
475        &mut self,
476        version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
477    ) {
478        assert_eq!(self.id, version_delta.prev_id);
479
480        let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
481            &version_delta.state_table_info_delta,
482            &version_delta.removed_table_ids,
483        );
484
485        #[expect(deprecated)]
486        {
487            if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
488                is_commit_epoch = true;
489                tracing::trace!(
490                    "max committed epoch bumped but no table committed epoch is changed"
491                );
492            }
493        }
494
495        // apply to `levels`, which is different compaction groups
496        for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
497            let mut is_applied_l0_compact = false;
498            for group_delta in &group_deltas.group_deltas {
499                match group_delta {
500                    GroupDeltaCommon::GroupConstruct(group_construct) => {
501                        let mut new_levels = build_initial_compaction_group_levels(
502                            *compaction_group_id,
503                            group_construct.get_group_config().unwrap(),
504                        );
505                        let parent_group_id = group_construct.parent_group_id;
506                        new_levels.parent_group_id = parent_group_id;
507                        #[expect(deprecated)]
508                        // for backward-compatibility of previous hummock version delta
509                        new_levels
510                            .member_table_ids
511                            .clone_from(&group_construct.table_ids);
512                        self.levels.insert(*compaction_group_id, new_levels);
513                        let member_table_ids = if group_construct.version()
514                            >= CompatibilityVersion::NoMemberTableIds
515                        {
516                            self.state_table_info
517                                .compaction_group_member_table_ids(*compaction_group_id)
518                                .iter()
519                                .map(|table_id| table_id.table_id)
520                                .collect()
521                        } else {
522                            #[expect(deprecated)]
523                            // for backward-compatibility of previous hummock version delta
524                            BTreeSet::from_iter(group_construct.table_ids.clone())
525                        };
526
527                        if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
528                            let split_key = if group_construct.split_key.is_some() {
529                                Some(Bytes::from(group_construct.split_key.clone().unwrap()))
530                            } else {
531                                None
532                            };
533                            self.init_with_parent_group_v2(
534                                parent_group_id,
535                                *compaction_group_id,
536                                group_construct.get_new_sst_start_id().into(),
537                                split_key.clone(),
538                            );
539                        } else {
540                            // for backward-compatibility of previous hummock version delta
541                            self.init_with_parent_group(
542                                parent_group_id,
543                                *compaction_group_id,
544                                member_table_ids,
545                                group_construct.get_new_sst_start_id().into(),
546                            );
547                        }
548                    }
549                    GroupDeltaCommon::GroupMerge(group_merge) => {
550                        tracing::info!(
551                            "group_merge left {:?} right {:?}",
552                            group_merge.left_group_id,
553                            group_merge.right_group_id
554                        );
555                        self.merge_compaction_group(
556                            group_merge.left_group_id,
557                            group_merge.right_group_id,
558                        )
559                    }
560                    GroupDeltaCommon::IntraLevel(level_delta) => {
561                        let levels =
562                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
563                                panic!("compaction group {} does not exist", compaction_group_id)
564                            });
565                        if is_commit_epoch {
566                            assert!(
567                                level_delta.removed_table_ids.is_empty(),
568                                "no sst should be deleted when committing an epoch"
569                            );
570
571                            let IntraLevelDelta {
572                                level_idx,
573                                l0_sub_level_id,
574                                inserted_table_infos,
575                                ..
576                            } = level_delta;
577                            {
578                                assert_eq!(
579                                    *level_idx, 0,
580                                    "we should only add to L0 when we commit an epoch."
581                                );
582                                if !inserted_table_infos.is_empty() {
583                                    insert_new_sub_level(
584                                        &mut levels.l0,
585                                        *l0_sub_level_id,
586                                        PbLevelType::Overlapping,
587                                        inserted_table_infos.clone(),
588                                        None,
589                                    );
590                                }
591                            }
592                        } else {
593                            // The delta is caused by compaction.
594                            levels.apply_compact_ssts(
595                                level_delta,
596                                self.state_table_info
597                                    .compaction_group_member_table_ids(*compaction_group_id),
598                            );
599                            if level_delta.level_idx == 0 {
600                                is_applied_l0_compact = true;
601                            }
602                        }
603                    }
604                    GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
605                        let levels =
606                            self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
607                                panic!("compaction group {} does not exist", compaction_group_id)
608                            });
609                        assert!(is_commit_epoch);
610
611                        if !inserted_table_infos.is_empty() {
612                            let next_l0_sub_level_id = levels
613                                .l0
614                                .sub_levels
615                                .last()
616                                .map(|level| level.sub_level_id + 1)
617                                .unwrap_or(1);
618
619                            insert_new_sub_level(
620                                &mut levels.l0,
621                                next_l0_sub_level_id,
622                                PbLevelType::Overlapping,
623                                inserted_table_infos.clone(),
624                                None,
625                            );
626                        }
627                    }
628                    GroupDeltaCommon::GroupDestroy(_) => {
629                        self.levels.remove(compaction_group_id);
630                    }
631                }
632            }
633            if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
634            {
635                levels.post_apply_l0_compact();
636            }
637        }
638        self.id = version_delta.id;
639        #[expect(deprecated)]
640        {
641            self.max_committed_epoch = version_delta.max_committed_epoch;
642        }
643
644        // apply to table watermark
645
646        // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id
647        let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
648            HashMap::new();
649
650        // apply to table watermark
651        for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
652            if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
653                if version_delta.removed_table_ids.contains(table_id) {
654                    modified_table_watermarks.insert(*table_id, None);
655                } else {
656                    let mut current_table_watermarks = (**current_table_watermarks).clone();
657                    current_table_watermarks.apply_new_table_watermarks(table_watermarks);
658                    modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
659                }
660            } else {
661                modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
662            }
663        }
664        for (table_id, table_watermarks) in &self.table_watermarks {
665            let safe_epoch = if let Some(state_table_info) =
666                self.state_table_info.info().get(table_id)
667                && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
668                && state_table_info.committed_epoch > *oldest_epoch
669            {
670                // safe epoch has progressed, need further clear.
671                state_table_info.committed_epoch
672            } else {
673                // safe epoch not progressed or the table has been removed. No need to truncate
674                continue;
675            };
676            let table_watermarks = modified_table_watermarks
677                .entry(*table_id)
678                .or_insert_with(|| Some((**table_watermarks).clone()));
679            if let Some(table_watermarks) = table_watermarks {
680                table_watermarks.clear_stale_epoch_watermark(safe_epoch);
681            }
682        }
683        // apply the staging table watermark to hummock version
684        for (table_id, table_watermarks) in modified_table_watermarks {
685            if let Some(table_watermarks) = table_watermarks {
686                self.table_watermarks
687                    .insert(table_id, Arc::new(table_watermarks));
688            } else {
689                self.table_watermarks.remove(&table_id);
690            }
691        }
692
693        // apply to table change log
694        Self::apply_change_log_delta(
695            &mut self.table_change_log,
696            &version_delta.change_log_delta,
697            &version_delta.removed_table_ids,
698            &version_delta.state_table_info_delta,
699            &changed_table_info,
700        );
701
702        // apply to vector index
703        apply_vector_index_delta(
704            &mut self.vector_indexes,
705            &version_delta.vector_index_delta,
706            &version_delta.removed_table_ids,
707        );
708    }
709
710    pub fn apply_change_log_delta<T: Clone>(
711        table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
712        change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
713        removed_table_ids: &HashSet<TableId>,
714        state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
715        changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
716    ) {
717        for (table_id, change_log_delta) in change_log_delta {
718            let new_change_log = &change_log_delta.new_log;
719            match table_change_log.entry(*table_id) {
720                Entry::Occupied(entry) => {
721                    let change_log = entry.into_mut();
722                    change_log.add_change_log(new_change_log.clone());
723                }
724                Entry::Vacant(entry) => {
725                    entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
726                }
727            };
728        }
729
730        // If a table has no new change log entry (even an empty one), it means we have stopped maintained
731        // the change log for the table, and then we will remove the table change log.
732        // The table change log will also be removed when the table id is removed.
733        table_change_log.retain(|table_id, _| {
734            if removed_table_ids.contains(table_id) {
735                return false;
736            }
737            if let Some(table_info_delta) = state_table_info_delta.get(table_id)
738                && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
739                // the table exists previously, and its committed epoch has progressed.
740            } else {
741                // otherwise, the table change log should be kept anyway
742                return true;
743            }
744            let contains = change_log_delta.contains_key(table_id);
745            if !contains {
746                warn!(
747                        ?table_id,
748                        "table change log dropped due to no further change log at newly committed epoch",
749                    );
750            }
751            contains
752        });
753
754        // truncate the remaining table change log
755        for (table_id, change_log_delta) in change_log_delta {
756            if let Some(change_log) = table_change_log.get_mut(table_id) {
757                change_log.truncate(change_log_delta.truncate_epoch);
758            }
759        }
760    }
761
762    pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
763        let mut ret: BTreeMap<_, _> = BTreeMap::new();
764        for (compaction_group_id, group) in &self.levels {
765            let mut levels = vec![];
766            levels.extend(group.l0.sub_levels.iter());
767            levels.extend(group.levels.iter());
768            for level in levels {
769                for table_info in &level.table_infos {
770                    if table_info.sst_id.inner() == table_info.object_id.inner() {
771                        continue;
772                    }
773                    let object_id = table_info.object_id;
774                    let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
775                    entry
776                        .entry(*compaction_group_id)
777                        .or_default()
778                        .push(table_info.sst_id)
779                }
780            }
781        }
782        ret
783    }
784
785    pub fn merge_compaction_group(
786        &mut self,
787        left_group_id: CompactionGroupId,
788        right_group_id: CompactionGroupId,
789    ) {
790        // Double check
791        let left_group_id_table_ids = self
792            .state_table_info
793            .compaction_group_member_table_ids(left_group_id)
794            .iter()
795            .map(|table_id| table_id.table_id);
796        let right_group_id_table_ids = self
797            .state_table_info
798            .compaction_group_member_table_ids(right_group_id)
799            .iter()
800            .map(|table_id| table_id.table_id);
801
802        assert!(
803            left_group_id_table_ids
804                .chain(right_group_id_table_ids)
805                .is_sorted()
806        );
807
808        let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
809        let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
810            panic!(
811                "compaction group should exist right {} all {:?}",
812                right_group_id, total_cg
813            )
814        });
815
816        let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
817            panic!(
818                "compaction group should exist left {} all {:?}",
819                left_group_id, total_cg
820            )
821        });
822
823        group_split::merge_levels(left_levels, right_levels);
824    }
825
826    pub fn init_with_parent_group_v2(
827        &mut self,
828        parent_group_id: CompactionGroupId,
829        group_id: CompactionGroupId,
830        new_sst_start_id: HummockSstableId,
831        split_key: Option<Bytes>,
832    ) {
833        let mut new_sst_id = new_sst_start_id;
834        if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId {
835            if new_sst_start_id != 0 {
836                if cfg!(debug_assertions) {
837                    panic!(
838                        "non-zero sst start id {} for NewCompactionGroup",
839                        new_sst_start_id
840                    );
841                } else {
842                    warn!(
843                        %new_sst_start_id,
844                        "non-zero sst start id for NewCompactionGroup"
845                    );
846                }
847            }
848            return;
849        } else if !self.levels.contains_key(&parent_group_id) {
850            unreachable!(
851                "non-existing parent group id {} to init from (V2)",
852                parent_group_id
853            );
854        }
855
856        let [parent_levels, cur_levels] = self
857            .levels
858            .get_disjoint_mut([&parent_group_id, &group_id])
859            .map(|res| res.unwrap());
860        // After certain compaction group operation, e.g. split, any ongoing compaction tasks created prior to that should be rejected due to expiration.
861        // By incrementing the compaction_group_version_id of the compaction group, and comparing it with the one recorded in compaction task, expired compaction tasks can be identified.
862        parent_levels.compaction_group_version_id += 1;
863        cur_levels.compaction_group_version_id += 1;
864
865        let l0 = &mut parent_levels.l0;
866        {
867            for sub_level in &mut l0.sub_levels {
868                let target_l0 = &mut cur_levels.l0;
869                // Remove SST from sub level may result in empty sub level. It will be purged
870                // whenever another compaction task is finished.
871                let insert_table_infos = if let Some(split_key) = &split_key {
872                    group_split::split_sst_info_for_level_v2(
873                        sub_level,
874                        &mut new_sst_id,
875                        split_key.clone(),
876                    )
877                } else {
878                    vec![]
879                };
880
881                if insert_table_infos.is_empty() {
882                    continue;
883                }
884
885                sub_level
886                    .table_infos
887                    .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
888                    .for_each(|sst_info| {
889                        sub_level.total_file_size -= sst_info.sst_size;
890                        sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
891                        l0.total_file_size -= sst_info.sst_size;
892                        l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
893                    });
894                match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
895                    Ok(idx) => {
896                        add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
897                    }
898                    Err(idx) => {
899                        insert_new_sub_level(
900                            target_l0,
901                            sub_level.sub_level_id,
902                            sub_level.level_type,
903                            insert_table_infos,
904                            Some(idx),
905                        );
906                    }
907                }
908            }
909
910            l0.sub_levels.retain(|level| !level.table_infos.is_empty());
911        }
912
913        for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
914            let insert_table_infos = if let Some(split_key) = &split_key {
915                group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
916            } else {
917                vec![]
918            };
919
920            if insert_table_infos.is_empty() {
921                continue;
922            }
923
924            cur_levels.levels[idx].total_file_size += insert_table_infos
925                .iter()
926                .map(|sst| sst.sst_size)
927                .sum::<u64>();
928            cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
929                .iter()
930                .map(|sst| sst.uncompressed_file_size)
931                .sum::<u64>();
932            cur_levels.levels[idx]
933                .table_infos
934                .extend(insert_table_infos);
935            cur_levels.levels[idx]
936                .table_infos
937                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
938            assert!(can_concat(&cur_levels.levels[idx].table_infos));
939            level
940                .table_infos
941                .extract_if(.., |sst_info| sst_info.table_ids.is_empty())
942                .for_each(|sst_info| {
943                    level.total_file_size -= sst_info.sst_size;
944                    level.uncompressed_file_size -= sst_info.uncompressed_file_size;
945                });
946        }
947
948        assert!(
949            parent_levels
950                .l0
951                .sub_levels
952                .iter()
953                .all(|level| !level.table_infos.is_empty())
954        );
955        assert!(
956            cur_levels
957                .l0
958                .sub_levels
959                .iter()
960                .all(|level| !level.table_infos.is_empty())
961        );
962    }
963}
964
965impl<T> HummockVersionCommon<T>
966where
967    T: SstableIdReader + ObjectIdReader,
968{
969    pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
970        self.get_sst_infos(exclude_change_log)
971            .map(|s| s.object_id())
972            .collect()
973    }
974
975    pub fn get_object_ids(
976        &self,
977        exclude_change_log: bool,
978    ) -> impl Iterator<Item = HummockObjectId> + '_ {
979        // DO NOT REMOVE THIS LINE
980        // This is to ensure that when adding new variant to `HummockObjectId`,
981        // the compiler will warn us if we forget to handle it here.
982        match HummockObjectId::Sstable(0.into()) {
983            HummockObjectId::Sstable(_) => {}
984            HummockObjectId::VectorFile(_) => {}
985        };
986        self.get_sst_infos(exclude_change_log)
987            .map(|s| HummockObjectId::Sstable(s.object_id()))
988            .chain(
989                self.vector_indexes
990                    .values()
991                    .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
992            )
993    }
994
995    pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
996        self.get_sst_infos(exclude_change_log)
997            .map(|s| s.sst_id())
998            .collect()
999    }
1000
1001    pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
1002        let may_table_change_log = if exclude_change_log {
1003            None
1004        } else {
1005            Some(self.table_change_log.values())
1006        };
1007        self.get_combined_levels()
1008            .flat_map(|level| level.table_infos.iter())
1009            .chain(
1010                may_table_change_log
1011                    .map(|v| {
1012                        v.flat_map(|table_change_log| {
1013                            table_change_log.iter().flat_map(|epoch_change_log| {
1014                                epoch_change_log
1015                                    .old_value
1016                                    .iter()
1017                                    .chain(epoch_change_log.new_value.iter())
1018                            })
1019                        })
1020                    })
1021                    .into_iter()
1022                    .flatten(),
1023            )
1024    }
1025}
1026
1027impl Levels {
1028    pub(crate) fn apply_compact_ssts(
1029        &mut self,
1030        level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1031        member_table_ids: &BTreeSet<TableId>,
1032    ) {
1033        let IntraLevelDeltaCommon {
1034            level_idx,
1035            l0_sub_level_id,
1036            inserted_table_infos: insert_table_infos,
1037            vnode_partition_count,
1038            removed_table_ids: delete_sst_ids_set,
1039            compaction_group_version_id,
1040        } = level_delta;
1041        let new_vnode_partition_count = *vnode_partition_count;
1042
1043        if is_compaction_task_expired(
1044            self.compaction_group_version_id,
1045            *compaction_group_version_id,
1046        ) {
1047            warn!(
1048                current_compaction_group_version_id = self.compaction_group_version_id,
1049                delta_compaction_group_version_id = compaction_group_version_id,
1050                level_idx,
1051                l0_sub_level_id,
1052                insert_table_infos = ?insert_table_infos
1053                    .iter()
1054                    .map(|sst| (sst.sst_id, sst.object_id))
1055                    .collect_vec(),
1056                ?delete_sst_ids_set,
1057                "This VersionDelta may be committed by an expired compact task. Please check it."
1058            );
1059            return;
1060        }
1061        if !delete_sst_ids_set.is_empty() {
1062            if *level_idx == 0 {
1063                for level in &mut self.l0.sub_levels {
1064                    level_delete_ssts(level, delete_sst_ids_set);
1065                }
1066            } else {
1067                let idx = *level_idx as usize - 1;
1068                level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set);
1069            }
1070        }
1071
1072        if !insert_table_infos.is_empty() {
1073            let insert_sst_level_id = *level_idx;
1074            let insert_sub_level_id = *l0_sub_level_id;
1075            if insert_sst_level_id == 0 {
1076                let l0 = &mut self.l0;
1077                let index = l0
1078                    .sub_levels
1079                    .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1080                assert!(
1081                    index < l0.sub_levels.len()
1082                        && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1083                    "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
1084                    insert_sub_level_id,
1085                    delete_sst_ids_set,
1086                    l0.sub_levels
1087                        .iter()
1088                        .map(|level| level.sub_level_id)
1089                        .collect_vec()
1090                );
1091                if l0.sub_levels[index].table_infos.is_empty()
1092                    && member_table_ids.len() == 1
1093                    && insert_table_infos.iter().all(|sst| {
1094                        sst.table_ids.len() == 1
1095                            && sst.table_ids[0]
1096                                == member_table_ids.iter().next().expect("non-empty").table_id
1097                    })
1098                {
1099                    // Only change vnode_partition_count for group which has only one state-table.
1100                    // Only change vnode_partition_count for level which update all sst files in this compact task.
1101                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1102                }
1103                level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1104            } else {
1105                let idx = insert_sst_level_id as usize - 1;
1106                if self.levels[idx].table_infos.is_empty()
1107                    && insert_table_infos
1108                        .iter()
1109                        .all(|sst| sst.table_ids.len() == 1)
1110                {
1111                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1112                } else if self.levels[idx].vnode_partition_count != 0
1113                    && new_vnode_partition_count == 0
1114                    && member_table_ids.len() > 1
1115                {
1116                    self.levels[idx].vnode_partition_count = 0;
1117                }
1118                level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1119            }
1120        }
1121    }
1122
1123    pub(crate) fn post_apply_l0_compact(&mut self) {
1124        {
1125            self.l0
1126                .sub_levels
1127                .retain(|level| !level.table_infos.is_empty());
1128            self.l0.total_file_size = self
1129                .l0
1130                .sub_levels
1131                .iter()
1132                .map(|level| level.total_file_size)
1133                .sum::<u64>();
1134            self.l0.uncompressed_file_size = self
1135                .l0
1136                .sub_levels
1137                .iter()
1138                .map(|level| level.uncompressed_file_size)
1139                .sum::<u64>();
1140        }
1141    }
1142}
1143
1144impl<T, L> HummockVersionCommon<T, L> {
1145    pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1146        self.levels
1147            .values()
1148            .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1149    }
1150}
1151
1152pub fn build_initial_compaction_group_levels(
1153    group_id: CompactionGroupId,
1154    compaction_config: &CompactionConfig,
1155) -> Levels {
1156    let mut levels = vec![];
1157    for l in 0..compaction_config.get_max_level() {
1158        levels.push(Level {
1159            level_idx: (l + 1) as u32,
1160            level_type: PbLevelType::Nonoverlapping,
1161            table_infos: vec![],
1162            total_file_size: 0,
1163            sub_level_id: 0,
1164            uncompressed_file_size: 0,
1165            vnode_partition_count: 0,
1166        });
1167    }
1168    #[expect(deprecated)] // for backward-compatibility of previous hummock version delta
1169    Levels {
1170        levels,
1171        l0: OverlappingLevel {
1172            sub_levels: vec![],
1173            total_file_size: 0,
1174            uncompressed_file_size: 0,
1175        },
1176        group_id,
1177        parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
1178        member_table_ids: vec![],
1179        compaction_group_version_id: 0,
1180    }
1181}
1182
1183fn split_sst_info_for_level(
1184    member_table_ids: &BTreeSet<u32>,
1185    level: &mut Level,
1186    new_sst_id: &mut HummockSstableId,
1187) -> Vec<SstableInfo> {
1188    // Remove SST from sub level may result in empty sub level. It will be purged
1189    // whenever another compaction task is finished.
1190    let mut insert_table_infos = vec![];
1191    for sst_info in &mut level.table_infos {
1192        let removed_table_ids = sst_info
1193            .table_ids
1194            .iter()
1195            .filter(|table_id| member_table_ids.contains(table_id))
1196            .cloned()
1197            .collect_vec();
1198        let sst_size = sst_info.sst_size;
1199        if sst_size / 2 == 0 {
1200            tracing::warn!(
1201                id = %sst_info.sst_id,
1202                object_id = %sst_info.object_id,
1203                sst_size = sst_info.sst_size,
1204                file_size = sst_info.file_size,
1205                "Sstable sst_size is under expected",
1206            );
1207        };
1208        if !removed_table_ids.is_empty() {
1209            let (modified_sst, branch_sst) = split_sst_with_table_ids(
1210                sst_info,
1211                new_sst_id,
1212                sst_size / 2,
1213                sst_size / 2,
1214                member_table_ids.iter().cloned().collect_vec(),
1215            );
1216            *sst_info = modified_sst;
1217            insert_table_infos.push(branch_sst);
1218        }
1219    }
1220    insert_table_infos
1221}
1222
1223/// Gets all compaction group ids.
1224pub fn get_compaction_group_ids(
1225    version: &HummockVersion,
1226) -> impl Iterator<Item = CompactionGroupId> + '_ {
1227    version.levels.keys().cloned()
1228}
1229
1230pub fn get_table_compaction_group_id_mapping(
1231    version: &HummockVersion,
1232) -> HashMap<StateTableId, CompactionGroupId> {
1233    version
1234        .state_table_info
1235        .info()
1236        .iter()
1237        .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id))
1238        .collect()
1239}
1240
1241/// Gets all SSTs in `group_id`
1242pub fn get_compaction_group_ssts(
1243    version: &HummockVersion,
1244    group_id: CompactionGroupId,
1245) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1246    let group_levels = version.get_compaction_group_levels(group_id);
1247    group_levels
1248        .l0
1249        .sub_levels
1250        .iter()
1251        .rev()
1252        .chain(group_levels.levels.iter())
1253        .flat_map(|level| {
1254            level
1255                .table_infos
1256                .iter()
1257                .map(|table_info| (table_info.object_id, table_info.sst_id))
1258        })
1259}
1260
1261pub fn new_sub_level(
1262    sub_level_id: u64,
1263    level_type: PbLevelType,
1264    table_infos: Vec<SstableInfo>,
1265) -> Level {
1266    if level_type == PbLevelType::Nonoverlapping {
1267        debug_assert!(
1268            can_concat(&table_infos),
1269            "sst of non-overlapping level is not concat-able: {:?}",
1270            table_infos
1271        );
1272    }
1273    let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1274    let uncompressed_file_size = table_infos
1275        .iter()
1276        .map(|table| table.uncompressed_file_size)
1277        .sum();
1278    Level {
1279        level_idx: 0,
1280        level_type,
1281        table_infos,
1282        total_file_size,
1283        sub_level_id,
1284        uncompressed_file_size,
1285        vnode_partition_count: 0,
1286    }
1287}
1288
1289pub fn add_ssts_to_sub_level(
1290    l0: &mut OverlappingLevel,
1291    sub_level_idx: usize,
1292    insert_table_infos: Vec<SstableInfo>,
1293) {
1294    insert_table_infos.iter().for_each(|sst| {
1295        l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1296        l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1297        l0.total_file_size += sst.sst_size;
1298        l0.uncompressed_file_size += sst.uncompressed_file_size;
1299    });
1300    l0.sub_levels[sub_level_idx]
1301        .table_infos
1302        .extend(insert_table_infos);
1303    if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1304        l0.sub_levels[sub_level_idx]
1305            .table_infos
1306            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1307        assert!(
1308            can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1309            "sstable ids: {:?}",
1310            l0.sub_levels[sub_level_idx]
1311                .table_infos
1312                .iter()
1313                .map(|sst| sst.sst_id)
1314                .collect_vec()
1315        );
1316    }
1317}
1318
1319/// `None` value of `sub_level_insert_hint` means append.
1320pub fn insert_new_sub_level(
1321    l0: &mut OverlappingLevel,
1322    insert_sub_level_id: u64,
1323    level_type: PbLevelType,
1324    insert_table_infos: Vec<SstableInfo>,
1325    sub_level_insert_hint: Option<usize>,
1326) {
1327    if insert_sub_level_id == u64::MAX {
1328        return;
1329    }
1330    let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1331        insert_pos
1332    } else {
1333        if let Some(newest_level) = l0.sub_levels.last() {
1334            assert!(
1335                newest_level.sub_level_id < insert_sub_level_id,
1336                "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1337                newest_level.sub_level_id,
1338                insert_sub_level_id,
1339                l0,
1340            );
1341        }
1342        l0.sub_levels.len()
1343    };
1344    #[cfg(debug_assertions)]
1345    {
1346        if insert_pos > 0
1347            && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1348        {
1349            debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1350        }
1351        if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1352            debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1353        }
1354    }
1355    // All files will be committed in one new Overlapping sub-level and become
1356    // Nonoverlapping  after at least one compaction.
1357    let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1358    l0.total_file_size += level.total_file_size;
1359    l0.uncompressed_file_size += level.uncompressed_file_size;
1360    l0.sub_levels.insert(insert_pos, level);
1361}
1362
1363/// Delete sstables if the table id is in the id set.
1364///
1365/// Return `true` if some sst is deleted, and `false` is the deletion is trivial
1366fn level_delete_ssts(
1367    operand: &mut Level,
1368    delete_sst_ids_superset: &HashSet<HummockSstableId>,
1369) -> bool {
1370    let original_len = operand.table_infos.len();
1371    operand
1372        .table_infos
1373        .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id));
1374    operand.total_file_size = operand
1375        .table_infos
1376        .iter()
1377        .map(|table| table.sst_size)
1378        .sum::<u64>();
1379    operand.uncompressed_file_size = operand
1380        .table_infos
1381        .iter()
1382        .map(|table| table.uncompressed_file_size)
1383        .sum::<u64>();
1384    original_len != operand.table_infos.len()
1385}
1386
1387fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1388    fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1389        format!(
1390            "sstable ids: {:?}",
1391            ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1392        )
1393    }
1394    operand.total_file_size += insert_table_infos
1395        .iter()
1396        .map(|sst| sst.sst_size)
1397        .sum::<u64>();
1398    operand.uncompressed_file_size += insert_table_infos
1399        .iter()
1400        .map(|sst| sst.uncompressed_file_size)
1401        .sum::<u64>();
1402    if operand.level_type == PbLevelType::Overlapping {
1403        operand.level_type = PbLevelType::Nonoverlapping;
1404        operand
1405            .table_infos
1406            .extend(insert_table_infos.iter().cloned());
1407        operand
1408            .table_infos
1409            .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1410        assert!(
1411            can_concat(&operand.table_infos),
1412            "{}",
1413            display_sstable_infos(&operand.table_infos)
1414        );
1415    } else if !insert_table_infos.is_empty() {
1416        let sorted_insert: Vec<_> = insert_table_infos
1417            .iter()
1418            .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1419            .cloned()
1420            .collect();
1421        let first = &sorted_insert[0];
1422        let last = &sorted_insert[sorted_insert.len() - 1];
1423        let pos = operand
1424            .table_infos
1425            .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1426        if pos >= operand.table_infos.len()
1427            || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1428        {
1429            operand.table_infos.splice(pos..pos, sorted_insert);
1430            // Validate the inserted SST batch along with the two SSTs that precede and follow it.
1431            let validate_range = operand
1432                .table_infos
1433                .iter()
1434                .skip(pos.saturating_sub(1))
1435                .take(insert_table_infos.len() + 2)
1436                .collect_vec();
1437            assert!(
1438                can_concat(&validate_range),
1439                "{}",
1440                display_sstable_infos(&validate_range),
1441            );
1442        } else {
1443            // If this branch is reached, it indicates some unexpected behavior in compaction.
1444            // Here we issue a warning and fall back to insert one by one.
1445            warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1446            for i in insert_table_infos {
1447                let pos = operand
1448                    .table_infos
1449                    .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1450                operand.table_infos.insert(pos, i.clone());
1451            }
1452            assert!(
1453                can_concat(&operand.table_infos),
1454                "{}",
1455                display_sstable_infos(&operand.table_infos)
1456            );
1457        }
1458    }
1459}
1460
1461pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1462    // DO NOT REMOVE THIS LINE
1463    // This is to ensure that when adding new variant to `HummockObjectId`,
1464    // the compiler will warn us if we forget to handle it here.
1465    match HummockObjectId::Sstable(0.into()) {
1466        HummockObjectId::Sstable(_) => {}
1467        HummockObjectId::VectorFile(_) => {}
1468    };
1469    version
1470        .levels
1471        .values()
1472        .flat_map(|cg| {
1473            cg.level0()
1474                .sub_levels
1475                .iter()
1476                .chain(cg.levels.iter())
1477                .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1478        })
1479        .chain(version.table_change_log.values().flat_map(|c| {
1480            c.iter().flat_map(|l| {
1481                l.old_value
1482                    .iter()
1483                    .chain(l.new_value.iter())
1484                    .map(|t| (t.object_id, t.file_size))
1485            })
1486        }))
1487        .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1488        .chain(
1489            version
1490                .vector_indexes
1491                .values()
1492                .flat_map(|index| index.get_objects()),
1493        )
1494        .collect()
1495}
1496
1497/// Verify the validity of a `HummockVersion` and return a list of violations if any.
1498/// Currently this method is only used by risectl validate-version.
1499pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1500    let mut res = Vec::new();
1501    // Ensure each table maps to only one compaction group
1502    for (group_id, levels) in &version.levels {
1503        // Ensure compaction group id matches
1504        if levels.group_id != *group_id {
1505            res.push(format!(
1506                "GROUP {}: inconsistent group id {} in Levels",
1507                group_id, levels.group_id
1508            ));
1509        }
1510
1511        let validate_level = |group: CompactionGroupId,
1512                              expected_level_idx: u32,
1513                              level: &Level,
1514                              res: &mut Vec<String>| {
1515            let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1516            if level.level_idx == 0 {
1517                level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1518                // Ensure sub-level is not empty
1519                if level.table_infos.is_empty() {
1520                    res.push(format!("{}: empty level", level_identifier));
1521                }
1522            } else if level.level_type != PbLevelType::Nonoverlapping {
1523                // Ensure non-L0 level is non-overlapping level
1524                res.push(format!(
1525                    "{}: level type {:?} is not non-overlapping",
1526                    level_identifier, level.level_type
1527                ));
1528            }
1529
1530            // Ensure level idx matches
1531            if level.level_idx != expected_level_idx {
1532                res.push(format!(
1533                    "{}: mismatched level idx {}",
1534                    level_identifier, expected_level_idx
1535                ));
1536            }
1537
1538            let mut prev_table_info: Option<&SstableInfo> = None;
1539            for table_info in &level.table_infos {
1540                // Ensure table_ids are sorted and unique
1541                if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1542                    res.push(format!(
1543                        "{} SST {}: table_ids not sorted",
1544                        level_identifier, table_info.object_id
1545                    ));
1546                }
1547
1548                // Ensure SSTs in non-overlapping level have non-overlapping key range
1549                if level.level_type == PbLevelType::Nonoverlapping {
1550                    if let Some(prev) = prev_table_info.take()
1551                        && prev
1552                            .key_range
1553                            .compare_right_with(&table_info.key_range.left)
1554                            != Ordering::Less
1555                    {
1556                        res.push(format!(
1557                            "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1558                            level_identifier, table_info.object_id, prev, table_info
1559                        ));
1560                    }
1561                    let _ = prev_table_info.insert(table_info);
1562                }
1563            }
1564        };
1565
1566        let l0 = &levels.l0;
1567        let mut prev_sub_level_id = u64::MAX;
1568        for sub_level in &l0.sub_levels {
1569            // Ensure sub_level_id is sorted and unique
1570            if sub_level.sub_level_id >= prev_sub_level_id {
1571                res.push(format!(
1572                    "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1573                    group_id, sub_level.level_idx, prev_sub_level_id
1574                ));
1575            }
1576            prev_sub_level_id = sub_level.sub_level_id;
1577
1578            validate_level(*group_id, 0, sub_level, &mut res);
1579        }
1580
1581        for idx in 1..=levels.levels.len() {
1582            validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1583        }
1584    }
1585    res
1586}
1587
1588#[cfg(test)]
1589mod tests {
1590    use std::collections::{HashMap, HashSet};
1591
1592    use bytes::Bytes;
1593    use risingwave_common::catalog::TableId;
1594    use risingwave_common::hash::VirtualNode;
1595    use risingwave_common::util::epoch::test_epoch;
1596    use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1597
1598    use super::group_split;
1599    use crate::HummockVersionId;
1600    use crate::compaction_group::group_split::*;
1601    use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1602    use crate::key::{FullKey, gen_key_from_str};
1603    use crate::key_range::KeyRange;
1604    use crate::level::{Level, Levels, OverlappingLevel};
1605    use crate::sstable_info::{SstableInfo, SstableInfoInner};
1606    use crate::version::{
1607        GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1608    };
1609
1610    fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1611        gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1612    }
1613
1614    fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1615        let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1616        let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1617        let full_key_l = FullKey::for_test(
1618            TableId::new(*table_ids.first().unwrap()),
1619            table_key_l,
1620            epoch,
1621        )
1622        .encode();
1623        let full_key_r =
1624            FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1625                .encode();
1626
1627        SstableInfoInner {
1628            sst_id: sst_id.into(),
1629            key_range: KeyRange {
1630                left: full_key_l.into(),
1631                right: full_key_r.into(),
1632                right_exclusive: false,
1633            },
1634            table_ids,
1635            object_id: sst_id.into(),
1636            min_epoch: 20,
1637            max_epoch: 20,
1638            file_size: 100,
1639            sst_size: 100,
1640            ..Default::default()
1641        }
1642    }
1643
1644    #[test]
1645    fn test_get_sst_object_ids() {
1646        let mut version = HummockVersion {
1647            id: HummockVersionId::new(0),
1648            levels: HashMap::from_iter([(
1649                0,
1650                Levels {
1651                    levels: vec![],
1652                    l0: OverlappingLevel {
1653                        sub_levels: vec![],
1654                        total_file_size: 0,
1655                        uncompressed_file_size: 0,
1656                    },
1657                    ..Default::default()
1658                },
1659            )]),
1660            ..Default::default()
1661        };
1662        assert_eq!(version.get_object_ids(false).count(), 0);
1663
1664        // Add to sub level
1665        version
1666            .levels
1667            .get_mut(&0)
1668            .unwrap()
1669            .l0
1670            .sub_levels
1671            .push(Level {
1672                table_infos: vec![
1673                    SstableInfoInner {
1674                        object_id: 11.into(),
1675                        sst_id: 11.into(),
1676                        ..Default::default()
1677                    }
1678                    .into(),
1679                ],
1680                ..Default::default()
1681            });
1682        assert_eq!(version.get_object_ids(false).count(), 1);
1683
1684        // Add to non sub level
1685        version.levels.get_mut(&0).unwrap().levels.push(Level {
1686            table_infos: vec![
1687                SstableInfoInner {
1688                    object_id: 22.into(),
1689                    sst_id: 22.into(),
1690                    ..Default::default()
1691                }
1692                .into(),
1693            ],
1694            ..Default::default()
1695        });
1696        assert_eq!(version.get_object_ids(false).count(), 2);
1697    }
1698
1699    #[test]
1700    fn test_apply_version_delta() {
1701        let mut version = HummockVersion {
1702            id: HummockVersionId::new(0),
1703            levels: HashMap::from_iter([
1704                (
1705                    0,
1706                    build_initial_compaction_group_levels(
1707                        0,
1708                        &CompactionConfig {
1709                            max_level: 6,
1710                            ..Default::default()
1711                        },
1712                    ),
1713                ),
1714                (
1715                    1,
1716                    build_initial_compaction_group_levels(
1717                        1,
1718                        &CompactionConfig {
1719                            max_level: 6,
1720                            ..Default::default()
1721                        },
1722                    ),
1723                ),
1724            ]),
1725            ..Default::default()
1726        };
1727        let version_delta = HummockVersionDelta {
1728            id: HummockVersionId::new(1),
1729            group_deltas: HashMap::from_iter([
1730                (
1731                    2,
1732                    GroupDeltas {
1733                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1734                            group_config: Some(CompactionConfig {
1735                                max_level: 6,
1736                                ..Default::default()
1737                            }),
1738                            ..Default::default()
1739                        }))],
1740                    },
1741                ),
1742                (
1743                    0,
1744                    GroupDeltas {
1745                        group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1746                    },
1747                ),
1748                (
1749                    1,
1750                    GroupDeltas {
1751                        group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1752                            1,
1753                            0,
1754                            HashSet::new(),
1755                            vec![
1756                                SstableInfoInner {
1757                                    object_id: 1.into(),
1758                                    sst_id: 1.into(),
1759                                    ..Default::default()
1760                                }
1761                                .into(),
1762                            ],
1763                            0,
1764                            version
1765                                .levels
1766                                .get(&1)
1767                                .as_ref()
1768                                .unwrap()
1769                                .compaction_group_version_id,
1770                        ))],
1771                    },
1772                ),
1773            ]),
1774            ..Default::default()
1775        };
1776        let version_delta = version_delta;
1777
1778        version.apply_version_delta(&version_delta);
1779        let mut cg1 = build_initial_compaction_group_levels(
1780            1,
1781            &CompactionConfig {
1782                max_level: 6,
1783                ..Default::default()
1784            },
1785        );
1786        cg1.levels[0] = Level {
1787            level_idx: 1,
1788            level_type: LevelType::Nonoverlapping,
1789            table_infos: vec![
1790                SstableInfoInner {
1791                    object_id: 1.into(),
1792                    sst_id: 1.into(),
1793                    ..Default::default()
1794                }
1795                .into(),
1796            ],
1797            ..Default::default()
1798        };
1799        assert_eq!(
1800            version,
1801            HummockVersion {
1802                id: HummockVersionId::new(1),
1803                levels: HashMap::from_iter([
1804                    (
1805                        2,
1806                        build_initial_compaction_group_levels(
1807                            2,
1808                            &CompactionConfig {
1809                                max_level: 6,
1810                                ..Default::default()
1811                            },
1812                        ),
1813                    ),
1814                    (1, cg1),
1815                ]),
1816                ..Default::default()
1817            }
1818        );
1819    }
1820
1821    fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1822        gen_sst_info_impl(object_id, table_ids, left, right).into()
1823    }
1824
1825    fn gen_sst_info_impl(
1826        object_id: u64,
1827        table_ids: Vec<u32>,
1828        left: Bytes,
1829        right: Bytes,
1830    ) -> SstableInfoInner {
1831        SstableInfoInner {
1832            object_id: object_id.into(),
1833            sst_id: object_id.into(),
1834            key_range: KeyRange {
1835                left,
1836                right,
1837                right_exclusive: false,
1838            },
1839            table_ids,
1840            file_size: 100,
1841            sst_size: 100,
1842            uncompressed_file_size: 100,
1843            ..Default::default()
1844        }
1845    }
1846
1847    #[test]
1848    fn test_merge_levels() {
1849        let mut left_levels = build_initial_compaction_group_levels(
1850            1,
1851            &CompactionConfig {
1852                max_level: 6,
1853                ..Default::default()
1854            },
1855        );
1856
1857        let mut right_levels = build_initial_compaction_group_levels(
1858            2,
1859            &CompactionConfig {
1860                max_level: 6,
1861                ..Default::default()
1862            },
1863        );
1864
1865        left_levels.levels[0] = Level {
1866            level_idx: 1,
1867            level_type: LevelType::Nonoverlapping,
1868            table_infos: vec![
1869                gen_sst_info(
1870                    1,
1871                    vec![3],
1872                    FullKey::for_test(
1873                        TableId::new(3),
1874                        gen_key_from_str(VirtualNode::from_index(1), "1"),
1875                        0,
1876                    )
1877                    .encode()
1878                    .into(),
1879                    FullKey::for_test(
1880                        TableId::new(3),
1881                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1882                        0,
1883                    )
1884                    .encode()
1885                    .into(),
1886                ),
1887                gen_sst_info(
1888                    10,
1889                    vec![3, 4],
1890                    FullKey::for_test(
1891                        TableId::new(3),
1892                        gen_key_from_str(VirtualNode::from_index(201), "1"),
1893                        0,
1894                    )
1895                    .encode()
1896                    .into(),
1897                    FullKey::for_test(
1898                        TableId::new(4),
1899                        gen_key_from_str(VirtualNode::from_index(10), "1"),
1900                        0,
1901                    )
1902                    .encode()
1903                    .into(),
1904                ),
1905                gen_sst_info(
1906                    11,
1907                    vec![4],
1908                    FullKey::for_test(
1909                        TableId::new(4),
1910                        gen_key_from_str(VirtualNode::from_index(11), "1"),
1911                        0,
1912                    )
1913                    .encode()
1914                    .into(),
1915                    FullKey::for_test(
1916                        TableId::new(4),
1917                        gen_key_from_str(VirtualNode::from_index(200), "1"),
1918                        0,
1919                    )
1920                    .encode()
1921                    .into(),
1922                ),
1923            ],
1924            total_file_size: 300,
1925            ..Default::default()
1926        };
1927
1928        left_levels.l0.sub_levels.push(Level {
1929            level_idx: 0,
1930            table_infos: vec![gen_sst_info(
1931                3,
1932                vec![3],
1933                FullKey::for_test(
1934                    TableId::new(3),
1935                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1936                    0,
1937                )
1938                .encode()
1939                .into(),
1940                FullKey::for_test(
1941                    TableId::new(3),
1942                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1943                    0,
1944                )
1945                .encode()
1946                .into(),
1947            )],
1948            sub_level_id: 101,
1949            level_type: LevelType::Overlapping,
1950            total_file_size: 100,
1951            ..Default::default()
1952        });
1953
1954        left_levels.l0.sub_levels.push(Level {
1955            level_idx: 0,
1956            table_infos: vec![gen_sst_info(
1957                3,
1958                vec![3],
1959                FullKey::for_test(
1960                    TableId::new(3),
1961                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1962                    0,
1963                )
1964                .encode()
1965                .into(),
1966                FullKey::for_test(
1967                    TableId::new(3),
1968                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1969                    0,
1970                )
1971                .encode()
1972                .into(),
1973            )],
1974            sub_level_id: 103,
1975            level_type: LevelType::Overlapping,
1976            total_file_size: 100,
1977            ..Default::default()
1978        });
1979
1980        left_levels.l0.sub_levels.push(Level {
1981            level_idx: 0,
1982            table_infos: vec![gen_sst_info(
1983                3,
1984                vec![3],
1985                FullKey::for_test(
1986                    TableId::new(3),
1987                    gen_key_from_str(VirtualNode::from_index(1), "1"),
1988                    0,
1989                )
1990                .encode()
1991                .into(),
1992                FullKey::for_test(
1993                    TableId::new(3),
1994                    gen_key_from_str(VirtualNode::from_index(200), "1"),
1995                    0,
1996                )
1997                .encode()
1998                .into(),
1999            )],
2000            sub_level_id: 105,
2001            level_type: LevelType::Nonoverlapping,
2002            total_file_size: 100,
2003            ..Default::default()
2004        });
2005
2006        right_levels.levels[0] = Level {
2007            level_idx: 1,
2008            level_type: LevelType::Nonoverlapping,
2009            table_infos: vec![
2010                gen_sst_info(
2011                    1,
2012                    vec![5],
2013                    FullKey::for_test(
2014                        TableId::new(5),
2015                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2016                        0,
2017                    )
2018                    .encode()
2019                    .into(),
2020                    FullKey::for_test(
2021                        TableId::new(5),
2022                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2023                        0,
2024                    )
2025                    .encode()
2026                    .into(),
2027                ),
2028                gen_sst_info(
2029                    10,
2030                    vec![5, 6],
2031                    FullKey::for_test(
2032                        TableId::new(5),
2033                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2034                        0,
2035                    )
2036                    .encode()
2037                    .into(),
2038                    FullKey::for_test(
2039                        TableId::new(6),
2040                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2041                        0,
2042                    )
2043                    .encode()
2044                    .into(),
2045                ),
2046                gen_sst_info(
2047                    11,
2048                    vec![6],
2049                    FullKey::for_test(
2050                        TableId::new(6),
2051                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2052                        0,
2053                    )
2054                    .encode()
2055                    .into(),
2056                    FullKey::for_test(
2057                        TableId::new(6),
2058                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2059                        0,
2060                    )
2061                    .encode()
2062                    .into(),
2063                ),
2064            ],
2065            total_file_size: 300,
2066            ..Default::default()
2067        };
2068
2069        right_levels.l0.sub_levels.push(Level {
2070            level_idx: 0,
2071            table_infos: vec![gen_sst_info(
2072                3,
2073                vec![5],
2074                FullKey::for_test(
2075                    TableId::new(5),
2076                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2077                    0,
2078                )
2079                .encode()
2080                .into(),
2081                FullKey::for_test(
2082                    TableId::new(5),
2083                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2084                    0,
2085                )
2086                .encode()
2087                .into(),
2088            )],
2089            sub_level_id: 101,
2090            level_type: LevelType::Overlapping,
2091            total_file_size: 100,
2092            ..Default::default()
2093        });
2094
2095        right_levels.l0.sub_levels.push(Level {
2096            level_idx: 0,
2097            table_infos: vec![gen_sst_info(
2098                5,
2099                vec![5],
2100                FullKey::for_test(
2101                    TableId::new(5),
2102                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2103                    0,
2104                )
2105                .encode()
2106                .into(),
2107                FullKey::for_test(
2108                    TableId::new(5),
2109                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2110                    0,
2111                )
2112                .encode()
2113                .into(),
2114            )],
2115            sub_level_id: 102,
2116            level_type: LevelType::Overlapping,
2117            total_file_size: 100,
2118            ..Default::default()
2119        });
2120
2121        right_levels.l0.sub_levels.push(Level {
2122            level_idx: 0,
2123            table_infos: vec![gen_sst_info(
2124                3,
2125                vec![5],
2126                FullKey::for_test(
2127                    TableId::new(5),
2128                    gen_key_from_str(VirtualNode::from_index(1), "1"),
2129                    0,
2130                )
2131                .encode()
2132                .into(),
2133                FullKey::for_test(
2134                    TableId::new(5),
2135                    gen_key_from_str(VirtualNode::from_index(200), "1"),
2136                    0,
2137                )
2138                .encode()
2139                .into(),
2140            )],
2141            sub_level_id: 103,
2142            level_type: LevelType::Nonoverlapping,
2143            total_file_size: 100,
2144            ..Default::default()
2145        });
2146
2147        {
2148            // test empty
2149            let mut left_levels = Levels::default();
2150            let right_levels = Levels::default();
2151
2152            group_split::merge_levels(&mut left_levels, right_levels);
2153        }
2154
2155        {
2156            // test empty left
2157            let mut left_levels = build_initial_compaction_group_levels(
2158                1,
2159                &CompactionConfig {
2160                    max_level: 6,
2161                    ..Default::default()
2162                },
2163            );
2164            let right_levels = right_levels.clone();
2165
2166            group_split::merge_levels(&mut left_levels, right_levels);
2167
2168            assert!(left_levels.l0.sub_levels.len() == 3);
2169            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2170            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2171            assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2172            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2173            assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2174            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2175
2176            assert!(left_levels.levels[0].level_idx == 1);
2177            assert_eq!(300, left_levels.levels[0].total_file_size);
2178        }
2179
2180        {
2181            // test empty right
2182            let mut left_levels = left_levels.clone();
2183            let right_levels = build_initial_compaction_group_levels(
2184                2,
2185                &CompactionConfig {
2186                    max_level: 6,
2187                    ..Default::default()
2188                },
2189            );
2190
2191            group_split::merge_levels(&mut left_levels, right_levels);
2192
2193            assert!(left_levels.l0.sub_levels.len() == 3);
2194            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2195            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2196            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2197            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2198            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2199            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2200
2201            assert!(left_levels.levels[0].level_idx == 1);
2202            assert_eq!(300, left_levels.levels[0].total_file_size);
2203        }
2204
2205        {
2206            let mut left_levels = left_levels.clone();
2207            let right_levels = right_levels.clone();
2208
2209            group_split::merge_levels(&mut left_levels, right_levels);
2210
2211            assert!(left_levels.l0.sub_levels.len() == 6);
2212            assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2213            assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2214            assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2215            assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2216            assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2217            assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2218            assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2219            assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2220            assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2221            assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2222            assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2223            assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2224
2225            assert!(left_levels.levels[0].level_idx == 1);
2226            assert_eq!(600, left_levels.levels[0].total_file_size);
2227        }
2228    }
2229
2230    #[test]
2231    fn test_get_split_pos() {
2232        let epoch = test_epoch(1);
2233        let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2234        let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2235        let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2236
2237        let ssts = vec![s1, s2, s3];
2238        let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2239
2240        let pos = group_split::get_split_pos(&ssts, split_key.clone());
2241        assert_eq!(1, pos);
2242
2243        let pos = group_split::get_split_pos(&vec![], split_key);
2244        assert_eq!(0, pos);
2245    }
2246
2247    #[test]
2248    fn test_split_sst() {
2249        let epoch = test_epoch(1);
2250        let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2251
2252        {
2253            let split_key = group_split::build_split_key(3, VirtualNode::ZERO);
2254            let origin_sst = sst.clone();
2255            let sst_size = origin_sst.sst_size;
2256            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2257            assert_eq!(SstSplitType::Both, split_type);
2258
2259            let mut new_sst_id = 10.into();
2260            let (origin_sst, branched_sst) = group_split::split_sst(
2261                origin_sst,
2262                &mut new_sst_id,
2263                split_key,
2264                sst_size / 2,
2265                sst_size / 2,
2266            );
2267
2268            let origin_sst = origin_sst.unwrap();
2269            let branched_sst = branched_sst.unwrap();
2270
2271            assert!(origin_sst.key_range.right_exclusive);
2272            assert!(
2273                origin_sst
2274                    .key_range
2275                    .right
2276                    .cmp(&branched_sst.key_range.left)
2277                    .is_le()
2278            );
2279            assert!(origin_sst.table_ids.is_sorted());
2280            assert!(branched_sst.table_ids.is_sorted());
2281            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2282            assert!(branched_sst.sst_size < origin_sst.file_size);
2283            assert_eq!(10, branched_sst.sst_id);
2284            assert_eq!(11, origin_sst.sst_id);
2285            assert_eq!(&3, branched_sst.table_ids.first().unwrap()); // split table_id to right
2286        }
2287
2288        {
2289            // test un-exist table_id
2290            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2291            let origin_sst = sst.clone();
2292            let sst_size = origin_sst.sst_size;
2293            let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2294            assert_eq!(SstSplitType::Both, split_type);
2295
2296            let mut new_sst_id = 10.into();
2297            let (origin_sst, branched_sst) = group_split::split_sst(
2298                origin_sst,
2299                &mut new_sst_id,
2300                split_key,
2301                sst_size / 2,
2302                sst_size / 2,
2303            );
2304
2305            let origin_sst = origin_sst.unwrap();
2306            let branched_sst = branched_sst.unwrap();
2307
2308            assert!(origin_sst.key_range.right_exclusive);
2309            assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2310            assert!(origin_sst.table_ids.is_sorted());
2311            assert!(branched_sst.table_ids.is_sorted());
2312            assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2313            assert!(branched_sst.sst_size < origin_sst.file_size);
2314            assert_eq!(10, branched_sst.sst_id);
2315            assert_eq!(11, origin_sst.sst_id);
2316            assert_eq!(&5, branched_sst.table_ids.first().unwrap()); // split table_id to right
2317        }
2318
2319        {
2320            let split_key = group_split::build_split_key(6, VirtualNode::ZERO);
2321            let origin_sst = sst.clone();
2322            let split_type = group_split::need_to_split(&origin_sst, split_key);
2323            assert_eq!(SstSplitType::Left, split_type);
2324        }
2325
2326        {
2327            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2328            let origin_sst = sst.clone();
2329            let split_type = group_split::need_to_split(&origin_sst, split_key);
2330            assert_eq!(SstSplitType::Both, split_type);
2331
2332            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2333            let origin_sst = sst.clone();
2334            let split_type = group_split::need_to_split(&origin_sst, split_key);
2335            assert_eq!(SstSplitType::Right, split_type);
2336        }
2337
2338        {
2339            // test key_range left = right
2340            let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2341            sst.key_range.right = sst.key_range.left.clone();
2342            let sst: SstableInfo = sst.into();
2343            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2344            let origin_sst = sst.clone();
2345            let sst_size = origin_sst.sst_size;
2346
2347            let mut new_sst_id = 10.into();
2348            let (origin_sst, branched_sst) = group_split::split_sst(
2349                origin_sst,
2350                &mut new_sst_id,
2351                split_key,
2352                sst_size / 2,
2353                sst_size / 2,
2354            );
2355
2356            assert!(origin_sst.is_none());
2357            assert!(branched_sst.is_some());
2358        }
2359    }
2360
2361    #[test]
2362    fn test_split_sst_info_for_level() {
2363        let mut version = HummockVersion {
2364            id: HummockVersionId(0),
2365            levels: HashMap::from_iter([(
2366                1,
2367                build_initial_compaction_group_levels(
2368                    1,
2369                    &CompactionConfig {
2370                        max_level: 6,
2371                        ..Default::default()
2372                    },
2373                ),
2374            )]),
2375            ..Default::default()
2376        };
2377
2378        let cg1 = version.levels.get_mut(&1).unwrap();
2379
2380        cg1.levels[0] = Level {
2381            level_idx: 1,
2382            level_type: LevelType::Nonoverlapping,
2383            table_infos: vec![
2384                gen_sst_info(
2385                    1,
2386                    vec![3],
2387                    FullKey::for_test(
2388                        TableId::new(3),
2389                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2390                        0,
2391                    )
2392                    .encode()
2393                    .into(),
2394                    FullKey::for_test(
2395                        TableId::new(3),
2396                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2397                        0,
2398                    )
2399                    .encode()
2400                    .into(),
2401                ),
2402                gen_sst_info(
2403                    10,
2404                    vec![3, 4],
2405                    FullKey::for_test(
2406                        TableId::new(3),
2407                        gen_key_from_str(VirtualNode::from_index(201), "1"),
2408                        0,
2409                    )
2410                    .encode()
2411                    .into(),
2412                    FullKey::for_test(
2413                        TableId::new(4),
2414                        gen_key_from_str(VirtualNode::from_index(10), "1"),
2415                        0,
2416                    )
2417                    .encode()
2418                    .into(),
2419                ),
2420                gen_sst_info(
2421                    11,
2422                    vec![4],
2423                    FullKey::for_test(
2424                        TableId::new(4),
2425                        gen_key_from_str(VirtualNode::from_index(11), "1"),
2426                        0,
2427                    )
2428                    .encode()
2429                    .into(),
2430                    FullKey::for_test(
2431                        TableId::new(4),
2432                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2433                        0,
2434                    )
2435                    .encode()
2436                    .into(),
2437                ),
2438            ],
2439            total_file_size: 300,
2440            ..Default::default()
2441        };
2442
2443        cg1.l0.sub_levels.push(Level {
2444            level_idx: 0,
2445            table_infos: vec![
2446                gen_sst_info(
2447                    2,
2448                    vec![2],
2449                    FullKey::for_test(
2450                        TableId::new(0),
2451                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2452                        0,
2453                    )
2454                    .encode()
2455                    .into(),
2456                    FullKey::for_test(
2457                        TableId::new(2),
2458                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2459                        0,
2460                    )
2461                    .encode()
2462                    .into(),
2463                ),
2464                gen_sst_info(
2465                    22,
2466                    vec![2],
2467                    FullKey::for_test(
2468                        TableId::new(0),
2469                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2470                        0,
2471                    )
2472                    .encode()
2473                    .into(),
2474                    FullKey::for_test(
2475                        TableId::new(2),
2476                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2477                        0,
2478                    )
2479                    .encode()
2480                    .into(),
2481                ),
2482                gen_sst_info(
2483                    23,
2484                    vec![2],
2485                    FullKey::for_test(
2486                        TableId::new(0),
2487                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2488                        0,
2489                    )
2490                    .encode()
2491                    .into(),
2492                    FullKey::for_test(
2493                        TableId::new(2),
2494                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2495                        0,
2496                    )
2497                    .encode()
2498                    .into(),
2499                ),
2500                gen_sst_info(
2501                    24,
2502                    vec![2],
2503                    FullKey::for_test(
2504                        TableId::new(2),
2505                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2506                        0,
2507                    )
2508                    .encode()
2509                    .into(),
2510                    FullKey::for_test(
2511                        TableId::new(2),
2512                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2513                        0,
2514                    )
2515                    .encode()
2516                    .into(),
2517                ),
2518                gen_sst_info(
2519                    25,
2520                    vec![2],
2521                    FullKey::for_test(
2522                        TableId::new(0),
2523                        gen_key_from_str(VirtualNode::from_index(1), "1"),
2524                        0,
2525                    )
2526                    .encode()
2527                    .into(),
2528                    FullKey::for_test(
2529                        TableId::new(0),
2530                        gen_key_from_str(VirtualNode::from_index(200), "1"),
2531                        0,
2532                    )
2533                    .encode()
2534                    .into(),
2535                ),
2536            ],
2537            sub_level_id: 101,
2538            level_type: LevelType::Overlapping,
2539            total_file_size: 300,
2540            ..Default::default()
2541        });
2542
2543        {
2544            // split Overlapping level
2545            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2546
2547            let mut new_sst_id = 100.into();
2548            let x = group_split::split_sst_info_for_level_v2(
2549                &mut cg1.l0.sub_levels[0],
2550                &mut new_sst_id,
2551                split_key,
2552            );
2553            // assert_eq!(3, x.len());
2554            // assert_eq!(100, x[0].sst_id);
2555            // assert_eq!(100, x[0].sst_size);
2556            // assert_eq!(101, x[1].sst_id);
2557            // assert_eq!(100, x[1].sst_size);
2558            // assert_eq!(102, x[2].sst_id);
2559            // assert_eq!(100, x[2].sst_size);
2560
2561            let mut right_l0 = OverlappingLevel {
2562                sub_levels: vec![],
2563                total_file_size: 0,
2564                uncompressed_file_size: 0,
2565            };
2566
2567            right_l0.sub_levels.push(Level {
2568                level_idx: 0,
2569                table_infos: x,
2570                sub_level_id: 101,
2571                total_file_size: 100,
2572                level_type: LevelType::Overlapping,
2573                ..Default::default()
2574            });
2575
2576            let right_levels = Levels {
2577                levels: vec![],
2578                l0: right_l0,
2579                ..Default::default()
2580            };
2581
2582            merge_levels(cg1, right_levels);
2583        }
2584
2585        {
2586            // test split empty level
2587            let mut new_sst_id = 100.into();
2588            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2589            let x = group_split::split_sst_info_for_level_v2(
2590                &mut cg1.levels[2],
2591                &mut new_sst_id,
2592                split_key,
2593            );
2594
2595            assert!(x.is_empty());
2596        }
2597
2598        {
2599            // test split to right Nonoverlapping level
2600            let mut cg1 = cg1.clone();
2601            let split_key = group_split::build_split_key(1, VirtualNode::ZERO);
2602
2603            let mut new_sst_id = 100.into();
2604            let x = group_split::split_sst_info_for_level_v2(
2605                &mut cg1.levels[0],
2606                &mut new_sst_id,
2607                split_key,
2608            );
2609
2610            assert_eq!(3, x.len());
2611            assert_eq!(1, x[0].sst_id);
2612            assert_eq!(100, x[0].sst_size);
2613            assert_eq!(10, x[1].sst_id);
2614            assert_eq!(100, x[1].sst_size);
2615            assert_eq!(11, x[2].sst_id);
2616            assert_eq!(100, x[2].sst_size);
2617
2618            assert_eq!(0, cg1.levels[0].table_infos.len());
2619        }
2620
2621        {
2622            // test split to left Nonoverlapping level
2623            let mut cg1 = cg1.clone();
2624            let split_key = group_split::build_split_key(5, VirtualNode::ZERO);
2625
2626            let mut new_sst_id = 100.into();
2627            let x = group_split::split_sst_info_for_level_v2(
2628                &mut cg1.levels[0],
2629                &mut new_sst_id,
2630                split_key,
2631            );
2632
2633            assert_eq!(0, x.len());
2634            assert_eq!(3, cg1.levels[0].table_infos.len());
2635        }
2636
2637        // {
2638        //     // test split to both Nonoverlapping level
2639        //     let mut cg1 = cg1.clone();
2640        //     let split_key = build_split_key(3, VirtualNode::MAX);
2641
2642        //     let mut new_sst_id = 100;
2643        //     let x = group_split::split_sst_info_for_level_v2(
2644        //         &mut cg1.levels[0],
2645        //         &mut new_sst_id,
2646        //         split_key,
2647        //     );
2648
2649        //     assert_eq!(2, x.len());
2650        //     assert_eq!(100, x[0].sst_id);
2651        //     assert_eq!(100 / 2, x[0].sst_size);
2652        //     assert_eq!(11, x[1].sst_id);
2653        //     assert_eq!(100, x[1].sst_size);
2654        //     assert_eq!(vec![3, 4], x[0].table_ids);
2655
2656        //     assert_eq!(2, cg1.levels[0].table_infos.len());
2657        //     assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2658        //     assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2659        //     assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2660        // }
2661
2662        {
2663            // test split to both Nonoverlapping level
2664            let mut cg1 = cg1.clone();
2665            let split_key = group_split::build_split_key(4, VirtualNode::ZERO);
2666
2667            let mut new_sst_id = 100.into();
2668            let x = group_split::split_sst_info_for_level_v2(
2669                &mut cg1.levels[0],
2670                &mut new_sst_id,
2671                split_key,
2672            );
2673
2674            assert_eq!(2, x.len());
2675            assert_eq!(100, x[0].sst_id);
2676            assert_eq!(100 / 2, x[0].sst_size);
2677            assert_eq!(11, x[1].sst_id);
2678            assert_eq!(100, x[1].sst_size);
2679            assert_eq!(vec![4], x[1].table_ids);
2680
2681            assert_eq!(2, cg1.levels[0].table_infos.len());
2682            assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2683            assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2684            assert_eq!(vec![3], cg1.levels[0].table_infos[1].table_ids);
2685        }
2686    }
2687}