risingwave_hummock_sdk/
version.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40    HummockSstableObjectId, HummockVersionId,
41};
42
43pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct HummockVersionStateTableInfo {
47    state_table_info: HashMap<TableId, PbStateTableInfo>,
48
49    // in memory index
50    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
51}
52
53impl HummockVersionStateTableInfo {
54    pub fn empty() -> Self {
55        Self {
56            state_table_info: HashMap::new(),
57            compaction_group_member_tables: HashMap::new(),
58        }
59    }
60
61    fn build_compaction_group_member_tables(
62        state_table_info: &HashMap<TableId, PbStateTableInfo>,
63    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
64        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
65        for (table_id, info) in state_table_info {
66            assert!(
67                ret.entry(info.compaction_group_id)
68                    .or_default()
69                    .insert(*table_id)
70            );
71        }
72        ret
73    }
74
75    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
76        self.state_table_info
77            .iter()
78            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
79            .collect()
80    }
81
82    pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
83        let state_table_info = state_table_info
84            .iter()
85            .map(|(table_id, info)| (*table_id, *info))
86            .collect();
87        let compaction_group_member_tables =
88            Self::build_compaction_group_member_tables(&state_table_info);
89        Self {
90            state_table_info,
91            compaction_group_member_tables,
92        }
93    }
94
95    pub fn from_protobuf_owned(state_table_info: HashMap<TableId, PbStateTableInfo>) -> Self {
96        let compaction_group_member_tables =
97            Self::build_compaction_group_member_tables(&state_table_info);
98        Self {
99            state_table_info,
100            compaction_group_member_tables,
101        }
102    }
103
104    pub fn apply_delta(
105        &mut self,
106        delta: &HashMap<TableId, StateTableInfoDelta>,
107        removed_table_id: &HashSet<TableId>,
108    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
109        let mut changed_table = HashMap::new();
110        let mut has_bumped_committed_epoch = false;
111        fn remove_table_from_compaction_group(
112            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
113            compaction_group_id: CompactionGroupId,
114            table_id: TableId,
115        ) {
116            let member_tables = compaction_group_member_tables
117                .get_mut(&compaction_group_id)
118                .expect("should exist");
119            assert!(member_tables.remove(&table_id));
120            if member_tables.is_empty() {
121                assert!(
122                    compaction_group_member_tables
123                        .remove(&compaction_group_id)
124                        .is_some()
125                );
126            }
127        }
128        for table_id in removed_table_id {
129            if let Some(prev_info) = self.state_table_info.remove(table_id) {
130                remove_table_from_compaction_group(
131                    &mut self.compaction_group_member_tables,
132                    prev_info.compaction_group_id,
133                    *table_id,
134                );
135                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
136            } else {
137                warn!(
138                    %table_id,
139                    "table to remove does not exist"
140                );
141            }
142        }
143        for (table_id, delta) in delta {
144            if removed_table_id.contains(table_id) {
145                continue;
146            }
147            let new_info = StateTableInfo {
148                committed_epoch: delta.committed_epoch,
149                compaction_group_id: delta.compaction_group_id,
150            };
151            match self.state_table_info.entry(*table_id) {
152                Entry::Occupied(mut entry) => {
153                    let prev_info = entry.get_mut();
154                    assert!(
155                        new_info.committed_epoch >= prev_info.committed_epoch,
156                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
157                        table_id,
158                        prev_info,
159                        new_info
160                    );
161                    if new_info.committed_epoch > prev_info.committed_epoch {
162                        has_bumped_committed_epoch = true;
163                    }
164                    if prev_info.compaction_group_id != new_info.compaction_group_id {
165                        // table moved to another compaction group
166                        remove_table_from_compaction_group(
167                            &mut self.compaction_group_member_tables,
168                            prev_info.compaction_group_id,
169                            *table_id,
170                        );
171                        assert!(
172                            self.compaction_group_member_tables
173                                .entry(new_info.compaction_group_id)
174                                .or_default()
175                                .insert(*table_id)
176                        );
177                    }
178                    let prev_info = replace(prev_info, new_info);
179                    changed_table.insert(*table_id, Some(prev_info));
180                }
181                Entry::Vacant(entry) => {
182                    assert!(
183                        self.compaction_group_member_tables
184                            .entry(new_info.compaction_group_id)
185                            .or_default()
186                            .insert(*table_id)
187                    );
188                    has_bumped_committed_epoch = true;
189                    entry.insert(new_info);
190                    changed_table.insert(*table_id, None);
191                }
192            }
193        }
194        debug_assert_eq!(
195            self.compaction_group_member_tables,
196            Self::build_compaction_group_member_tables(&self.state_table_info)
197        );
198        (changed_table, has_bumped_committed_epoch)
199    }
200
201    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
202        &self.state_table_info
203    }
204
205    pub fn compaction_group_member_table_ids(
206        &self,
207        compaction_group_id: CompactionGroupId,
208    ) -> &BTreeSet<TableId> {
209        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
210        self.compaction_group_member_tables
211            .get(&compaction_group_id)
212            .unwrap_or_else(|| EMPTY_SET.deref())
213    }
214
215    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
216        &self.compaction_group_member_tables
217    }
218
219    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
220        self.state_table_info
221            .values()
222            .map(|info| info.committed_epoch)
223            .max()
224    }
225}
226
227#[derive(Debug, Clone, PartialEq)]
228pub struct HummockVersionCommon<T, L = T> {
229    pub id: HummockVersionId,
230    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
231    #[deprecated]
232    pub(crate) max_committed_epoch: u64,
233    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
234    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
235    pub state_table_info: HummockVersionStateTableInfo,
236    pub vector_indexes: HashMap<TableId, VectorIndex>,
237}
238
239pub type HummockVersion = HummockVersionCommon<SstableInfo>;
240
241pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
242
243impl Default for HummockVersion {
244    fn default() -> Self {
245        HummockVersion::from(&PbHummockVersion::default())
246    }
247}
248
249impl<T> HummockVersionCommon<T>
250where
251    T: for<'a> From<&'a PbSstableInfo>,
252    PbSstableInfo: for<'a> From<&'a T>,
253{
254    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
255    /// maintain backward compatibility.
256    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
257        pb_version.into()
258    }
259
260    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
261    /// We should maintain backward compatibility.
262    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
263        pb_version.into()
264    }
265
266    pub fn to_protobuf(&self) -> PbHummockVersion {
267        self.into()
268    }
269}
270
271impl<T> HummockVersionCommon<T>
272where
273    T: From<PbSstableInfo>,
274    PbSstableInfo: for<'a> From<&'a T>,
275{
276    /// Convert an owned `PbHummockVersion` deserialized from persisted state to `HummockVersion`,
277    /// moving data instead of cloning for better performance on large checkpoints.
278    pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
279        pb_version.into()
280    }
281}
282
283impl HummockVersion {
284    pub fn estimated_encode_len(&self) -> usize {
285        self.levels.len() * size_of::<CompactionGroupId>()
286            + self
287                .levels
288                .values()
289                .map(|level| level.estimated_encode_len())
290                .sum::<usize>()
291            + self.table_watermarks.len() * size_of::<u32>()
292            + self
293                .table_watermarks
294                .values()
295                .map(|table_watermark| table_watermark.estimated_encode_len())
296                .sum::<usize>()
297            + self
298                .table_change_log
299                .values()
300                .map(|c| {
301                    c.iter()
302                        .map(|l| {
303                            l.old_value
304                                .iter()
305                                .chain(l.new_value.iter())
306                                .map(|s| s.estimated_encode_len())
307                                .sum::<usize>()
308                        })
309                        .sum::<usize>()
310                })
311                .sum::<usize>()
312    }
313}
314
315impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
316where
317    T: for<'a> From<&'a PbSstableInfo>,
318{
319    fn from(pb_version: &PbHummockVersion) -> Self {
320        #[expect(deprecated)]
321        Self {
322            id: pb_version.id,
323            levels: pb_version
324                .levels
325                .iter()
326                .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
327                .collect(),
328            max_committed_epoch: pb_version.max_committed_epoch,
329            table_watermarks: pb_version
330                .table_watermarks
331                .iter()
332                .map(|(table_id, table_watermark)| {
333                    (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
334                })
335                .collect(),
336            table_change_log: pb_version
337                .table_change_logs
338                .iter()
339                .map(|(table_id, change_log)| {
340                    (*table_id, TableChangeLogCommon::from_protobuf(change_log))
341                })
342                .collect(),
343            state_table_info: HummockVersionStateTableInfo::from_protobuf(
344                &pb_version.state_table_info,
345            ),
346            vector_indexes: pb_version
347                .vector_indexes
348                .iter()
349                .map(|(table_id, index)| (*table_id, index.clone().into()))
350                .collect(),
351        }
352    }
353}
354
355impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
356where
357    T: From<PbSstableInfo>,
358{
359    fn from(pb_version: PbHummockVersion) -> Self {
360        #[expect(deprecated)]
361        Self {
362            id: pb_version.id,
363            levels: pb_version
364                .levels
365                .into_iter()
366                .map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
367                .collect(),
368            max_committed_epoch: pb_version.max_committed_epoch,
369            table_watermarks: pb_version
370                .table_watermarks
371                .into_iter()
372                .map(|(table_id, table_watermark)| {
373                    (table_id, Arc::new(TableWatermarks::from(table_watermark)))
374                })
375                .collect(),
376            table_change_log: pb_version
377                .table_change_logs
378                .into_iter()
379                .map(|(table_id, change_log)| {
380                    (
381                        table_id,
382                        TableChangeLogCommon::from_protobuf_owned(change_log),
383                    )
384                })
385                .collect(),
386            state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
387                pb_version.state_table_info,
388            ),
389            vector_indexes: pb_version
390                .vector_indexes
391                .into_iter()
392                .map(|(table_id, index)| (table_id, index.into()))
393                .collect(),
394        }
395    }
396}
397
398impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
399where
400    PbSstableInfo: for<'a> From<&'a T>,
401{
402    fn from(version: &HummockVersionCommon<T>) -> Self {
403        #[expect(deprecated)]
404        Self {
405            id: version.id,
406            levels: version
407                .levels
408                .iter()
409                .map(|(group_id, levels)| (*group_id, levels.into()))
410                .collect(),
411            max_committed_epoch: version.max_committed_epoch,
412            table_watermarks: version
413                .table_watermarks
414                .iter()
415                .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
416                .collect(),
417            table_change_logs: version
418                .table_change_log
419                .iter()
420                .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
421                .collect(),
422            state_table_info: version.state_table_info.state_table_info.clone(),
423            vector_indexes: version
424                .vector_indexes
425                .iter()
426                .map(|(table_id, index)| (*table_id, index.clone().into()))
427                .collect(),
428        }
429    }
430}
431
432impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
433where
434    PbSstableInfo: From<T>,
435    PbSstableInfo: for<'a> From<&'a T>,
436{
437    fn from(version: HummockVersionCommon<T>) -> Self {
438        #[expect(deprecated)]
439        Self {
440            id: version.id,
441            levels: version
442                .levels
443                .into_iter()
444                .map(|(group_id, levels)| (group_id, levels.into()))
445                .collect(),
446            max_committed_epoch: version.max_committed_epoch,
447            table_watermarks: version
448                .table_watermarks
449                .into_iter()
450                .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
451                .collect(),
452            table_change_logs: version
453                .table_change_log
454                .into_iter()
455                .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
456                .collect(),
457            state_table_info: version.state_table_info.state_table_info.clone(),
458            vector_indexes: version
459                .vector_indexes
460                .into_iter()
461                .map(|(table_id, index)| (table_id, index.into()))
462                .collect(),
463        }
464    }
465}
466
467impl HummockVersion {
468    pub fn next_version_id(&self) -> HummockVersionId {
469        self.id + 1
470    }
471
472    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
473        // for backward-compatibility of previous hummock version delta
474        self.state_table_info.state_table_info.is_empty()
475            && self.levels.values().any(|group| {
476                // state_table_info is not previously filled, but there previously exists some tables
477                #[expect(deprecated)]
478                !group.member_table_ids.is_empty()
479            })
480    }
481
482    pub fn may_fill_backward_compatible_state_table_info_delta(
483        &self,
484        delta: &mut HummockVersionDelta,
485    ) {
486        #[expect(deprecated)]
487        // for backward-compatibility of previous hummock version delta
488        for (cg_id, group) in &self.levels {
489            for table_id in &group.member_table_ids {
490                assert!(
491                    delta
492                        .state_table_info_delta
493                        .insert(
494                            (*table_id).into(),
495                            StateTableInfoDelta {
496                                committed_epoch: self.max_committed_epoch,
497                                compaction_group_id: *cg_id,
498                            }
499                        )
500                        .is_none(),
501                    "duplicate table id {} in cg {}",
502                    table_id,
503                    cg_id
504                );
505            }
506        }
507    }
508
509    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
510        #[expect(deprecated)]
511        let mut init_version = HummockVersion {
512            id: FIRST_VERSION_ID,
513            levels: Default::default(),
514            max_committed_epoch: INVALID_EPOCH,
515            table_watermarks: HashMap::new(),
516            table_change_log: HashMap::new(),
517            state_table_info: HummockVersionStateTableInfo::empty(),
518            vector_indexes: Default::default(),
519        };
520        for group_id in [
521            StaticCompactionGroupId::StateDefault as CompactionGroupId,
522            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
523        ] {
524            init_version.levels.insert(
525                group_id,
526                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
527            );
528        }
529        init_version
530    }
531
532    pub fn version_delta_after(&self) -> HummockVersionDelta {
533        #[expect(deprecated)]
534        HummockVersionDelta {
535            id: self.next_version_id(),
536            prev_id: self.id,
537            trivial_move: false,
538            max_committed_epoch: self.max_committed_epoch,
539            group_deltas: Default::default(),
540            new_table_watermarks: HashMap::new(),
541            removed_table_ids: HashSet::new(),
542            change_log_delta: HashMap::new(),
543            state_table_info_delta: Default::default(),
544            vector_index_delta: Default::default(),
545        }
546    }
547
548    pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
549        let table_change_log = {
550            let mut table_change_log = HashMap::new();
551            for (table_id, log) in &mut self.table_change_log {
552                let change_log_iter =
553                    log.change_log_iter_mut()
554                        .map(|item| EpochNewChangeLogCommon {
555                            new_value: std::mem::take(&mut item.new_value),
556                            old_value: std::mem::take(&mut item.old_value),
557                            non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
558                            checkpoint_epoch: item.checkpoint_epoch,
559                        });
560                table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
561            }
562
563            table_change_log
564        };
565
566        let local_version = LocalHummockVersion::from(self);
567
568        (local_version, table_change_log)
569    }
570}
571
572impl<T, L> HummockVersionCommon<T, L> {
573    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
574        self.state_table_info
575            .info()
576            .get(&table_id)
577            .map(|info| info.committed_epoch)
578    }
579}
580
581#[derive(Debug, PartialEq, Clone)]
582pub struct HummockVersionDeltaCommon<T, L = T> {
583    pub id: HummockVersionId,
584    pub prev_id: HummockVersionId,
585    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
586    #[deprecated]
587    pub(crate) max_committed_epoch: u64,
588    pub trivial_move: bool,
589    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
590    pub removed_table_ids: HashSet<TableId>,
591    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
592    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
593    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
594}
595
596pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
597
598pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
599
600impl Default for HummockVersionDelta {
601    fn default() -> Self {
602        HummockVersionDelta::from(&PbHummockVersionDelta::default())
603    }
604}
605
606impl<T> HummockVersionDeltaCommon<T>
607where
608    T: for<'a> From<&'a PbSstableInfo>,
609    PbSstableInfo: for<'a> From<&'a T>,
610{
611    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
612    /// We should maintain backward compatibility.
613    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
614        delta.into()
615    }
616
617    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
618    /// maintain backward compatibility.
619    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
620        delta.into()
621    }
622
623    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
624        self.into()
625    }
626}
627
628impl<T> HummockVersionDeltaCommon<T>
629where
630    T: From<PbSstableInfo>,
631    PbSstableInfo: for<'a> From<&'a T>,
632{
633    /// Convert an owned `PbHummockVersionDelta` deserialized from persisted state to
634    /// `HummockVersionDelta`, moving data instead of cloning.
635    pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
636        delta.into()
637    }
638}
639
640pub trait SstableIdReader {
641    fn sst_id(&self) -> HummockSstableId;
642}
643
644pub trait ObjectIdReader {
645    fn object_id(&self) -> HummockSstableObjectId;
646}
647
648impl<T> HummockVersionDeltaCommon<T>
649where
650    T: SstableIdReader + ObjectIdReader,
651{
652    /// Get the newly added object ids from the version delta.
653    ///
654    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
655    /// but it is possible that the object is moved or split from other compaction groups or levels.
656    pub fn newly_added_object_ids(
657        &self,
658        exclude_table_change_log: bool,
659    ) -> HashSet<HummockObjectId> {
660        // DO NOT REMOVE THIS LINE
661        // This is to ensure that when adding new variant to `HummockObjectId`,
662        // the compiler will warn us if we forget to handle it here.
663        match HummockObjectId::Sstable(0.into()) {
664            HummockObjectId::Sstable(_) => {}
665            HummockObjectId::VectorFile(_) => {}
666            HummockObjectId::HnswGraphFile(_) => {}
667        };
668        self.newly_added_sst_infos(exclude_table_change_log)
669            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
670            .chain(
671                self.vector_index_delta
672                    .values()
673                    .flat_map(|vector_index_delta| {
674                        vector_index_delta
675                            .newly_added_objects()
676                            .map(|(object_id, _)| object_id)
677                    }),
678            )
679            .collect()
680    }
681
682    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
683        self.newly_added_sst_infos(exclude_table_change_log)
684            .map(|sst| sst.sst_id())
685            .collect()
686    }
687}
688
689impl<T> HummockVersionDeltaCommon<T> {
690    pub fn newly_added_sst_infos(
691        &self,
692        exclude_table_change_log: bool,
693    ) -> impl Iterator<Item = &'_ T> {
694        let may_table_change_delta = if exclude_table_change_log {
695            None
696        } else {
697            Some(self.change_log_delta.values())
698        };
699        self.group_deltas
700            .values()
701            .flat_map(|group_deltas| {
702                group_deltas.group_deltas.iter().flat_map(|group_delta| {
703                    let sst_slice = match &group_delta {
704                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
705                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
706                            inserted_table_infos,
707                            ..
708                        }) => Some(inserted_table_infos.iter()),
709                        GroupDeltaCommon::GroupConstruct(_)
710                        | GroupDeltaCommon::GroupDestroy(_)
711                        | GroupDeltaCommon::GroupMerge(_)
712                        | GroupDeltaCommon::TruncateTables(_) => None,
713                    };
714                    sst_slice.into_iter().flatten()
715                })
716            })
717            .chain(
718                may_table_change_delta
719                    .map(|v| {
720                        v.flat_map(|delta| {
721                            let new_log = &delta.new_log;
722                            new_log.new_value.iter().chain(new_log.old_value.iter())
723                        })
724                    })
725                    .into_iter()
726                    .flatten(),
727            )
728    }
729}
730
731impl HummockVersionDelta {
732    #[expect(deprecated)]
733    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
734        self.max_committed_epoch
735    }
736}
737
738impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
739where
740    T: for<'a> From<&'a PbSstableInfo>,
741{
742    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
743        #[expect(deprecated)]
744        Self {
745            id: pb_version_delta.id,
746            prev_id: pb_version_delta.prev_id,
747            group_deltas: pb_version_delta
748                .group_deltas
749                .iter()
750                .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
751                .collect(),
752            max_committed_epoch: pb_version_delta.max_committed_epoch,
753            trivial_move: pb_version_delta.trivial_move,
754            new_table_watermarks: pb_version_delta
755                .new_table_watermarks
756                .iter()
757                .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
758                .collect(),
759            removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
760            change_log_delta: pb_version_delta
761                .change_log_delta
762                .iter()
763                .map(|(table_id, log_delta)| {
764                    (
765                        *table_id,
766                        ChangeLogDeltaCommon {
767                            truncate_epoch: log_delta.truncate_epoch,
768                            new_log: log_delta.new_log.as_ref().unwrap().into(),
769                        },
770                    )
771                })
772                .collect(),
773
774            state_table_info_delta: pb_version_delta
775                .state_table_info_delta
776                .iter()
777                .map(|(table_id, delta)| (*table_id, *delta))
778                .collect(),
779            vector_index_delta: pb_version_delta
780                .vector_index_delta
781                .iter()
782                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
783                .collect(),
784        }
785    }
786}
787
788impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
789where
790    PbSstableInfo: for<'a> From<&'a T>,
791{
792    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
793        #[expect(deprecated)]
794        Self {
795            id: version_delta.id,
796            prev_id: version_delta.prev_id,
797            group_deltas: version_delta
798                .group_deltas
799                .iter()
800                .map(|(group_id, deltas)| (*group_id, deltas.into()))
801                .collect(),
802            max_committed_epoch: version_delta.max_committed_epoch,
803            trivial_move: version_delta.trivial_move,
804            new_table_watermarks: version_delta
805                .new_table_watermarks
806                .iter()
807                .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
808                .collect(),
809            removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
810            change_log_delta: version_delta
811                .change_log_delta
812                .iter()
813                .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
814                .collect(),
815            state_table_info_delta: version_delta.state_table_info_delta.clone(),
816            vector_index_delta: version_delta
817                .vector_index_delta
818                .iter()
819                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
820                .collect(),
821        }
822    }
823}
824
825impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
826where
827    PbSstableInfo: From<T>,
828{
829    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
830        #[expect(deprecated)]
831        Self {
832            id: version_delta.id,
833            prev_id: version_delta.prev_id,
834            group_deltas: version_delta
835                .group_deltas
836                .into_iter()
837                .map(|(group_id, deltas)| (group_id, deltas.into()))
838                .collect(),
839            max_committed_epoch: version_delta.max_committed_epoch,
840            trivial_move: version_delta.trivial_move,
841            new_table_watermarks: version_delta
842                .new_table_watermarks
843                .into_iter()
844                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
845                .collect(),
846            removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
847            change_log_delta: version_delta
848                .change_log_delta
849                .into_iter()
850                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
851                .collect(),
852            state_table_info_delta: version_delta.state_table_info_delta,
853            vector_index_delta: version_delta
854                .vector_index_delta
855                .into_iter()
856                .map(|(table_id, delta)| (table_id, delta.into()))
857                .collect(),
858        }
859    }
860}
861
862impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
863where
864    T: From<PbSstableInfo>,
865{
866    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
867        #[expect(deprecated)]
868        Self {
869            id: pb_version_delta.id,
870            prev_id: pb_version_delta.prev_id,
871            group_deltas: pb_version_delta
872                .group_deltas
873                .into_iter()
874                .map(|(group_id, deltas)| (group_id, deltas.into()))
875                .collect(),
876            max_committed_epoch: pb_version_delta.max_committed_epoch,
877            trivial_move: pb_version_delta.trivial_move,
878            new_table_watermarks: pb_version_delta
879                .new_table_watermarks
880                .into_iter()
881                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
882                .collect(),
883            removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
884            change_log_delta: pb_version_delta
885                .change_log_delta
886                .into_iter()
887                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
888                .collect(),
889            state_table_info_delta: pb_version_delta.state_table_info_delta,
890            vector_index_delta: pb_version_delta
891                .vector_index_delta
892                .into_iter()
893                .map(|(table_id, delta)| (table_id, delta.into()))
894                .collect(),
895        }
896    }
897}
898
899#[derive(Debug, PartialEq, Clone)]
900pub struct IntraLevelDeltaCommon<T> {
901    pub level_idx: u32,
902    pub l0_sub_level_id: u64,
903    pub removed_table_ids: HashSet<HummockSstableId>,
904    pub inserted_table_infos: Vec<T>,
905    pub vnode_partition_count: u32,
906    pub compaction_group_version_id: u64,
907}
908
909pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
910
911impl IntraLevelDelta {
912    pub fn estimated_encode_len(&self) -> usize {
913        size_of::<u32>()
914            + size_of::<u64>()
915            + self.removed_table_ids.len() * size_of::<u32>()
916            + self
917                .inserted_table_infos
918                .iter()
919                .map(|sst| sst.estimated_encode_len())
920                .sum::<usize>()
921            + size_of::<u32>()
922    }
923}
924
925impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
926where
927    T: From<PbSstableInfo>,
928{
929    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
930        Self {
931            level_idx: pb_intra_level_delta.level_idx,
932            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
933            removed_table_ids: HashSet::from_iter(
934                pb_intra_level_delta.removed_table_ids.iter().copied(),
935            ),
936            inserted_table_infos: pb_intra_level_delta
937                .inserted_table_infos
938                .into_iter()
939                .map(Into::into)
940                .collect_vec(),
941            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
942            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
943        }
944    }
945}
946
947impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
948where
949    PbSstableInfo: From<T>,
950{
951    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
952        Self {
953            level_idx: intra_level_delta.level_idx,
954            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
955            removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
956            inserted_table_infos: intra_level_delta
957                .inserted_table_infos
958                .into_iter()
959                .map(Into::into)
960                .collect_vec(),
961            vnode_partition_count: intra_level_delta.vnode_partition_count,
962            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
963        }
964    }
965}
966
967impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
968where
969    PbSstableInfo: for<'a> From<&'a T>,
970{
971    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
972        Self {
973            level_idx: intra_level_delta.level_idx,
974            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
975            removed_table_ids: intra_level_delta
976                .removed_table_ids
977                .iter()
978                .copied()
979                .collect(),
980            inserted_table_infos: intra_level_delta
981                .inserted_table_infos
982                .iter()
983                .map(Into::into)
984                .collect_vec(),
985            vnode_partition_count: intra_level_delta.vnode_partition_count,
986            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
987        }
988    }
989}
990
991impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
992where
993    T: for<'a> From<&'a PbSstableInfo>,
994{
995    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
996        Self {
997            level_idx: pb_intra_level_delta.level_idx,
998            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
999            removed_table_ids: HashSet::from_iter(
1000                pb_intra_level_delta.removed_table_ids.iter().copied(),
1001            ),
1002            inserted_table_infos: pb_intra_level_delta
1003                .inserted_table_infos
1004                .iter()
1005                .map(Into::into)
1006                .collect_vec(),
1007            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
1008            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
1009        }
1010    }
1011}
1012
1013impl IntraLevelDelta {
1014    pub fn new(
1015        level_idx: u32,
1016        l0_sub_level_id: u64,
1017        removed_table_ids: HashSet<HummockSstableId>,
1018        inserted_table_infos: Vec<SstableInfo>,
1019        vnode_partition_count: u32,
1020        compaction_group_version_id: u64,
1021    ) -> Self {
1022        Self {
1023            level_idx,
1024            l0_sub_level_id,
1025            removed_table_ids,
1026            inserted_table_infos,
1027            vnode_partition_count,
1028            compaction_group_version_id,
1029        }
1030    }
1031}
1032
1033#[derive(Debug, PartialEq, Clone)]
1034pub enum GroupDeltaCommon<T> {
1035    NewL0SubLevel(Vec<T>),
1036    IntraLevel(IntraLevelDeltaCommon<T>),
1037    GroupConstruct(Box<PbGroupConstruct>),
1038    GroupDestroy(PbGroupDestroy),
1039    GroupMerge(PbGroupMerge),
1040    TruncateTables(HashSet<TableId>),
1041}
1042
1043pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1044
1045impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1046where
1047    T: From<PbSstableInfo>,
1048{
1049    fn from(pb_group_delta: PbGroupDelta) -> Self {
1050        match pb_group_delta.delta_type {
1051            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1052                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1053            }
1054            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1055                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1056            }
1057            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1058                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1059            }
1060            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1061                GroupDeltaCommon::GroupMerge(pb_group_merge)
1062            }
1063            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1064                pb_new_sub_level
1065                    .inserted_table_infos
1066                    .into_iter()
1067                    .map(T::from)
1068                    .collect(),
1069            ),
1070            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1071                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1072            }
1073
1074            None => panic!("delta_type is not set"),
1075        }
1076    }
1077}
1078
1079impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1080where
1081    PbSstableInfo: From<T>,
1082{
1083    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1084        match group_delta {
1085            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1086                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1087            },
1088            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1089                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1090            },
1091            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1092                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1093            },
1094            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1095                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1096            },
1097            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1098                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1099                    inserted_table_infos: new_sub_level
1100                        .into_iter()
1101                        .map(PbSstableInfo::from)
1102                        .collect(),
1103                })),
1104            },
1105            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1106                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1107                    table_ids: table_ids.iter().copied().collect(),
1108                })),
1109            },
1110        }
1111    }
1112}
1113
1114impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1115where
1116    PbSstableInfo: for<'a> From<&'a T>,
1117{
1118    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1119        match group_delta {
1120            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1121                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1122            },
1123            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1124                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1125            },
1126            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1127                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1128            },
1129            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1130                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1131            },
1132            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1133                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1134                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1135                })),
1136            },
1137            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1138                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1139                    table_ids: table_ids.iter().copied().collect(),
1140                })),
1141            },
1142        }
1143    }
1144}
1145
1146impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1147where
1148    T: for<'a> From<&'a PbSstableInfo>,
1149{
1150    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1151        match &pb_group_delta.delta_type {
1152            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1153                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1154            }
1155            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1156                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1157            }
1158            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1159                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1160            }
1161            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1162                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1163            }
1164            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1165                pb_new_sub_level
1166                    .inserted_table_infos
1167                    .iter()
1168                    .map(T::from)
1169                    .collect(),
1170            ),
1171            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1172                GroupDeltaCommon::TruncateTables(
1173                    pb_truncate_tables.table_ids.iter().copied().collect(),
1174                )
1175            }
1176            None => panic!("delta_type is not set"),
1177        }
1178    }
1179}
1180
1181#[derive(Debug, PartialEq, Clone)]
1182pub struct GroupDeltasCommon<T> {
1183    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1184}
1185
1186impl<T> Default for GroupDeltasCommon<T> {
1187    fn default() -> Self {
1188        Self {
1189            group_deltas: vec![],
1190        }
1191    }
1192}
1193
1194pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1195
1196impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1197where
1198    T: From<PbSstableInfo>,
1199{
1200    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1201        Self {
1202            group_deltas: pb_group_deltas
1203                .group_deltas
1204                .into_iter()
1205                .map(GroupDeltaCommon::from)
1206                .collect_vec(),
1207        }
1208    }
1209}
1210
1211impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1212where
1213    PbSstableInfo: From<T>,
1214{
1215    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1216        Self {
1217            group_deltas: group_deltas
1218                .group_deltas
1219                .into_iter()
1220                .map(|group_delta| group_delta.into())
1221                .collect_vec(),
1222        }
1223    }
1224}
1225
1226impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1227where
1228    PbSstableInfo: for<'a> From<&'a T>,
1229{
1230    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1231        Self {
1232            group_deltas: group_deltas
1233                .group_deltas
1234                .iter()
1235                .map(|group_delta| group_delta.into())
1236                .collect_vec(),
1237        }
1238    }
1239}
1240
1241impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1242where
1243    T: for<'a> From<&'a PbSstableInfo>,
1244{
1245    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1246        Self {
1247            group_deltas: pb_group_deltas
1248                .group_deltas
1249                .iter()
1250                .map(GroupDeltaCommon::from)
1251                .collect_vec(),
1252        }
1253    }
1254}
1255
1256impl<T> GroupDeltasCommon<T>
1257where
1258    PbSstableInfo: for<'a> From<&'a T>,
1259{
1260    pub fn to_protobuf(&self) -> PbGroupDeltas {
1261        self.into()
1262    }
1263}
1264
1265impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1266    #[expect(deprecated)]
1267    fn from(delta: HummockVersionDelta) -> Self {
1268        Self {
1269            id: delta.id,
1270            prev_id: delta.prev_id,
1271            group_deltas: delta.group_deltas,
1272            max_committed_epoch: delta.max_committed_epoch,
1273            trivial_move: delta.trivial_move,
1274            new_table_watermarks: delta.new_table_watermarks,
1275            removed_table_ids: delta.removed_table_ids,
1276            change_log_delta: delta
1277                .change_log_delta
1278                .into_iter()
1279                .map(|(k, v)| {
1280                    (
1281                        k,
1282                        ChangeLogDeltaCommon {
1283                            truncate_epoch: v.truncate_epoch,
1284                            new_log: EpochNewChangeLogCommon {
1285                                new_value: Vec::new(),
1286                                old_value: Vec::new(),
1287                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1288                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1289                            },
1290                        },
1291                    )
1292                })
1293                .collect(),
1294            state_table_info_delta: delta.state_table_info_delta,
1295            vector_index_delta: delta.vector_index_delta,
1296        }
1297    }
1298}
1299
1300impl From<HummockVersion> for LocalHummockVersion {
1301    #[expect(deprecated)]
1302    fn from(version: HummockVersion) -> Self {
1303        Self {
1304            id: version.id,
1305            levels: version.levels,
1306            max_committed_epoch: version.max_committed_epoch,
1307            table_watermarks: version.table_watermarks,
1308            table_change_log: version
1309                .table_change_log
1310                .into_iter()
1311                .map(|(k, v)| {
1312                    let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1313                        .change_log_into_iter()
1314                        .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1315                            new_value: Vec::new(),
1316                            old_value: Vec::new(),
1317                            non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1318                            checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1319                        })
1320                        .collect();
1321                    (k, TableChangeLogCommon::new(epoch_new_change_logs))
1322                })
1323                .collect(),
1324            state_table_info: version.state_table_info,
1325            vector_indexes: version.vector_indexes,
1326        }
1327    }
1328}