risingwave_hummock_sdk/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40    HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45    state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47    // in memory index
48    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52    pub fn empty() -> Self {
53        Self {
54            state_table_info: HashMap::new(),
55            compaction_group_member_tables: HashMap::new(),
56        }
57    }
58
59    fn build_compaction_group_member_tables(
60        state_table_info: &HashMap<TableId, PbStateTableInfo>,
61    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63        for (table_id, info) in state_table_info {
64            assert!(
65                ret.entry(info.compaction_group_id)
66                    .or_default()
67                    .insert(*table_id)
68            );
69        }
70        ret
71    }
72
73    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74        self.state_table_info
75            .iter()
76            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77            .collect()
78    }
79
80    pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
81        let state_table_info = state_table_info
82            .iter()
83            .map(|(table_id, info)| (*table_id, *info))
84            .collect();
85        let compaction_group_member_tables =
86            Self::build_compaction_group_member_tables(&state_table_info);
87        Self {
88            state_table_info,
89            compaction_group_member_tables,
90        }
91    }
92
93    pub fn apply_delta(
94        &mut self,
95        delta: &HashMap<TableId, StateTableInfoDelta>,
96        removed_table_id: &HashSet<TableId>,
97    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
98        let mut changed_table = HashMap::new();
99        let mut has_bumped_committed_epoch = false;
100        fn remove_table_from_compaction_group(
101            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
102            compaction_group_id: CompactionGroupId,
103            table_id: TableId,
104        ) {
105            let member_tables = compaction_group_member_tables
106                .get_mut(&compaction_group_id)
107                .expect("should exist");
108            assert!(member_tables.remove(&table_id));
109            if member_tables.is_empty() {
110                assert!(
111                    compaction_group_member_tables
112                        .remove(&compaction_group_id)
113                        .is_some()
114                );
115            }
116        }
117        for table_id in removed_table_id {
118            if let Some(prev_info) = self.state_table_info.remove(table_id) {
119                remove_table_from_compaction_group(
120                    &mut self.compaction_group_member_tables,
121                    prev_info.compaction_group_id,
122                    *table_id,
123                );
124                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
125            } else {
126                warn!(
127                    %table_id,
128                    "table to remove does not exist"
129                );
130            }
131        }
132        for (table_id, delta) in delta {
133            if removed_table_id.contains(table_id) {
134                continue;
135            }
136            let new_info = StateTableInfo {
137                committed_epoch: delta.committed_epoch,
138                compaction_group_id: delta.compaction_group_id,
139            };
140            match self.state_table_info.entry(*table_id) {
141                Entry::Occupied(mut entry) => {
142                    let prev_info = entry.get_mut();
143                    assert!(
144                        new_info.committed_epoch >= prev_info.committed_epoch,
145                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
146                        table_id,
147                        prev_info,
148                        new_info
149                    );
150                    if new_info.committed_epoch > prev_info.committed_epoch {
151                        has_bumped_committed_epoch = true;
152                    }
153                    if prev_info.compaction_group_id != new_info.compaction_group_id {
154                        // table moved to another compaction group
155                        remove_table_from_compaction_group(
156                            &mut self.compaction_group_member_tables,
157                            prev_info.compaction_group_id,
158                            *table_id,
159                        );
160                        assert!(
161                            self.compaction_group_member_tables
162                                .entry(new_info.compaction_group_id)
163                                .or_default()
164                                .insert(*table_id)
165                        );
166                    }
167                    let prev_info = replace(prev_info, new_info);
168                    changed_table.insert(*table_id, Some(prev_info));
169                }
170                Entry::Vacant(entry) => {
171                    assert!(
172                        self.compaction_group_member_tables
173                            .entry(new_info.compaction_group_id)
174                            .or_default()
175                            .insert(*table_id)
176                    );
177                    has_bumped_committed_epoch = true;
178                    entry.insert(new_info);
179                    changed_table.insert(*table_id, None);
180                }
181            }
182        }
183        debug_assert_eq!(
184            self.compaction_group_member_tables,
185            Self::build_compaction_group_member_tables(&self.state_table_info)
186        );
187        (changed_table, has_bumped_committed_epoch)
188    }
189
190    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
191        &self.state_table_info
192    }
193
194    pub fn compaction_group_member_table_ids(
195        &self,
196        compaction_group_id: CompactionGroupId,
197    ) -> &BTreeSet<TableId> {
198        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
199        self.compaction_group_member_tables
200            .get(&compaction_group_id)
201            .unwrap_or_else(|| EMPTY_SET.deref())
202    }
203
204    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
205        &self.compaction_group_member_tables
206    }
207
208    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
209        self.state_table_info
210            .values()
211            .map(|info| info.committed_epoch)
212            .max()
213    }
214}
215
216#[derive(Debug, Clone, PartialEq)]
217pub struct HummockVersionCommon<T, L = T> {
218    pub id: HummockVersionId,
219    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
220    #[deprecated]
221    pub(crate) max_committed_epoch: u64,
222    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
223    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
224    pub state_table_info: HummockVersionStateTableInfo,
225    pub vector_indexes: HashMap<TableId, VectorIndex>,
226}
227
228pub type HummockVersion = HummockVersionCommon<SstableInfo>;
229
230pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
231
232impl Default for HummockVersion {
233    fn default() -> Self {
234        HummockVersion::from(&PbHummockVersion::default())
235    }
236}
237
238impl<T> HummockVersionCommon<T>
239where
240    T: for<'a> From<&'a PbSstableInfo>,
241    PbSstableInfo: for<'a> From<&'a T>,
242{
243    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
244    /// maintain backward compatibility.
245    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
246        pb_version.into()
247    }
248
249    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
250    /// We should maintain backward compatibility.
251    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
252        pb_version.into()
253    }
254
255    pub fn to_protobuf(&self) -> PbHummockVersion {
256        self.into()
257    }
258}
259
260impl HummockVersion {
261    pub fn estimated_encode_len(&self) -> usize {
262        self.levels.len() * size_of::<CompactionGroupId>()
263            + self
264                .levels
265                .values()
266                .map(|level| level.estimated_encode_len())
267                .sum::<usize>()
268            + self.table_watermarks.len() * size_of::<u32>()
269            + self
270                .table_watermarks
271                .values()
272                .map(|table_watermark| table_watermark.estimated_encode_len())
273                .sum::<usize>()
274            + self
275                .table_change_log
276                .values()
277                .map(|c| {
278                    c.iter()
279                        .map(|l| {
280                            l.old_value
281                                .iter()
282                                .chain(l.new_value.iter())
283                                .map(|s| s.estimated_encode_len())
284                                .sum::<usize>()
285                        })
286                        .sum::<usize>()
287                })
288                .sum::<usize>()
289    }
290}
291
292impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
293where
294    T: for<'a> From<&'a PbSstableInfo>,
295{
296    fn from(pb_version: &PbHummockVersion) -> Self {
297        #[expect(deprecated)]
298        Self {
299            id: HummockVersionId(pb_version.id),
300            levels: pb_version
301                .levels
302                .iter()
303                .map(|(group_id, levels)| {
304                    (*group_id as CompactionGroupId, LevelsCommon::from(levels))
305                })
306                .collect(),
307            max_committed_epoch: pb_version.max_committed_epoch,
308            table_watermarks: pb_version
309                .table_watermarks
310                .iter()
311                .map(|(table_id, table_watermark)| {
312                    (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
313                })
314                .collect(),
315            table_change_log: pb_version
316                .table_change_logs
317                .iter()
318                .map(|(table_id, change_log)| {
319                    (*table_id, TableChangeLogCommon::from_protobuf(change_log))
320                })
321                .collect(),
322            state_table_info: HummockVersionStateTableInfo::from_protobuf(
323                &pb_version.state_table_info,
324            ),
325            vector_indexes: pb_version
326                .vector_indexes
327                .iter()
328                .map(|(table_id, index)| (*table_id, index.clone().into()))
329                .collect(),
330        }
331    }
332}
333
334impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
335where
336    PbSstableInfo: for<'a> From<&'a T>,
337{
338    fn from(version: &HummockVersionCommon<T>) -> Self {
339        #[expect(deprecated)]
340        Self {
341            id: version.id.0,
342            levels: version
343                .levels
344                .iter()
345                .map(|(group_id, levels)| (*group_id as _, levels.into()))
346                .collect(),
347            max_committed_epoch: version.max_committed_epoch,
348            table_watermarks: version
349                .table_watermarks
350                .iter()
351                .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
352                .collect(),
353            table_change_logs: version
354                .table_change_log
355                .iter()
356                .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
357                .collect(),
358            state_table_info: version.state_table_info.state_table_info.clone(),
359            vector_indexes: version
360                .vector_indexes
361                .iter()
362                .map(|(table_id, index)| (*table_id, index.clone().into()))
363                .collect(),
364        }
365    }
366}
367
368impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
369where
370    PbSstableInfo: From<T>,
371    PbSstableInfo: for<'a> From<&'a T>,
372{
373    fn from(version: HummockVersionCommon<T>) -> Self {
374        #[expect(deprecated)]
375        Self {
376            id: version.id.0,
377            levels: version
378                .levels
379                .into_iter()
380                .map(|(group_id, levels)| (group_id as _, levels.into()))
381                .collect(),
382            max_committed_epoch: version.max_committed_epoch,
383            table_watermarks: version
384                .table_watermarks
385                .into_iter()
386                .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
387                .collect(),
388            table_change_logs: version
389                .table_change_log
390                .into_iter()
391                .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
392                .collect(),
393            state_table_info: version.state_table_info.state_table_info.clone(),
394            vector_indexes: version
395                .vector_indexes
396                .into_iter()
397                .map(|(table_id, index)| (table_id, index.into()))
398                .collect(),
399        }
400    }
401}
402
403impl HummockVersion {
404    pub fn next_version_id(&self) -> HummockVersionId {
405        self.id.next()
406    }
407
408    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409        // for backward-compatibility of previous hummock version delta
410        self.state_table_info.state_table_info.is_empty()
411            && self.levels.values().any(|group| {
412                // state_table_info is not previously filled, but there previously exists some tables
413                #[expect(deprecated)]
414                !group.member_table_ids.is_empty()
415            })
416    }
417
418    pub fn may_fill_backward_compatible_state_table_info_delta(
419        &self,
420        delta: &mut HummockVersionDelta,
421    ) {
422        #[expect(deprecated)]
423        // for backward-compatibility of previous hummock version delta
424        for (cg_id, group) in &self.levels {
425            for table_id in &group.member_table_ids {
426                assert!(
427                    delta
428                        .state_table_info_delta
429                        .insert(
430                            (*table_id).into(),
431                            StateTableInfoDelta {
432                                committed_epoch: self.max_committed_epoch,
433                                compaction_group_id: *cg_id,
434                            }
435                        )
436                        .is_none(),
437                    "duplicate table id {} in cg {}",
438                    table_id,
439                    cg_id
440                );
441            }
442        }
443    }
444
445    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446        #[expect(deprecated)]
447        let mut init_version = HummockVersion {
448            id: FIRST_VERSION_ID,
449            levels: Default::default(),
450            max_committed_epoch: INVALID_EPOCH,
451            table_watermarks: HashMap::new(),
452            table_change_log: HashMap::new(),
453            state_table_info: HummockVersionStateTableInfo::empty(),
454            vector_indexes: Default::default(),
455        };
456        for group_id in [
457            StaticCompactionGroupId::StateDefault as CompactionGroupId,
458            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
459        ] {
460            init_version.levels.insert(
461                group_id,
462                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
463            );
464        }
465        init_version
466    }
467
468    pub fn version_delta_after(&self) -> HummockVersionDelta {
469        #[expect(deprecated)]
470        HummockVersionDelta {
471            id: self.next_version_id(),
472            prev_id: self.id,
473            trivial_move: false,
474            max_committed_epoch: self.max_committed_epoch,
475            group_deltas: Default::default(),
476            new_table_watermarks: HashMap::new(),
477            removed_table_ids: HashSet::new(),
478            change_log_delta: HashMap::new(),
479            state_table_info_delta: Default::default(),
480            vector_index_delta: Default::default(),
481        }
482    }
483
484    pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
485        let table_change_log = {
486            let mut table_change_log = HashMap::new();
487            for (table_id, log) in &mut self.table_change_log {
488                let change_log_iter =
489                    log.change_log_iter_mut()
490                        .map(|item| EpochNewChangeLogCommon {
491                            new_value: std::mem::take(&mut item.new_value),
492                            old_value: std::mem::take(&mut item.old_value),
493                            non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
494                            checkpoint_epoch: item.checkpoint_epoch,
495                        });
496                table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
497            }
498
499            table_change_log
500        };
501
502        let local_version = LocalHummockVersion::from(self);
503
504        (local_version, table_change_log)
505    }
506}
507
508impl<T, L> HummockVersionCommon<T, L> {
509    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
510        self.state_table_info
511            .info()
512            .get(&table_id)
513            .map(|info| info.committed_epoch)
514    }
515}
516
517#[derive(Debug, PartialEq, Clone)]
518pub struct HummockVersionDeltaCommon<T, L = T> {
519    pub id: HummockVersionId,
520    pub prev_id: HummockVersionId,
521    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
522    #[deprecated]
523    pub(crate) max_committed_epoch: u64,
524    pub trivial_move: bool,
525    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
526    pub removed_table_ids: HashSet<TableId>,
527    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
528    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
529    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
530}
531
532pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
533
534pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
535
536impl Default for HummockVersionDelta {
537    fn default() -> Self {
538        HummockVersionDelta::from(&PbHummockVersionDelta::default())
539    }
540}
541
542impl<T> HummockVersionDeltaCommon<T>
543where
544    T: for<'a> From<&'a PbSstableInfo>,
545    PbSstableInfo: for<'a> From<&'a T>,
546{
547    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
548    /// We should maintain backward compatibility.
549    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
550        delta.into()
551    }
552
553    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
554    /// maintain backward compatibility.
555    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
556        delta.into()
557    }
558
559    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
560        self.into()
561    }
562}
563
564pub trait SstableIdReader {
565    fn sst_id(&self) -> HummockSstableId;
566}
567
568pub trait ObjectIdReader {
569    fn object_id(&self) -> HummockSstableObjectId;
570}
571
572impl<T> HummockVersionDeltaCommon<T>
573where
574    T: SstableIdReader + ObjectIdReader,
575{
576    /// Get the newly added object ids from the version delta.
577    ///
578    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
579    /// but it is possible that the object is moved or split from other compaction groups or levels.
580    pub fn newly_added_object_ids(
581        &self,
582        exclude_table_change_log: bool,
583    ) -> HashSet<HummockObjectId> {
584        // DO NOT REMOVE THIS LINE
585        // This is to ensure that when adding new variant to `HummockObjectId`,
586        // the compiler will warn us if we forget to handle it here.
587        match HummockObjectId::Sstable(0.into()) {
588            HummockObjectId::Sstable(_) => {}
589            HummockObjectId::VectorFile(_) => {}
590            HummockObjectId::HnswGraphFile(_) => {}
591        };
592        self.newly_added_sst_infos(exclude_table_change_log)
593            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
594            .chain(
595                self.vector_index_delta
596                    .values()
597                    .flat_map(|vector_index_delta| {
598                        vector_index_delta
599                            .newly_added_objects()
600                            .map(|(object_id, _)| object_id)
601                    }),
602            )
603            .collect()
604    }
605
606    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
607        self.newly_added_sst_infos(exclude_table_change_log)
608            .map(|sst| sst.sst_id())
609            .collect()
610    }
611}
612
613impl<T> HummockVersionDeltaCommon<T> {
614    pub fn newly_added_sst_infos(
615        &self,
616        exclude_table_change_log: bool,
617    ) -> impl Iterator<Item = &'_ T> {
618        let may_table_change_delta = if exclude_table_change_log {
619            None
620        } else {
621            Some(self.change_log_delta.values())
622        };
623        self.group_deltas
624            .values()
625            .flat_map(|group_deltas| {
626                group_deltas.group_deltas.iter().flat_map(|group_delta| {
627                    let sst_slice = match &group_delta {
628                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
629                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
630                            inserted_table_infos,
631                            ..
632                        }) => Some(inserted_table_infos.iter()),
633                        GroupDeltaCommon::GroupConstruct(_)
634                        | GroupDeltaCommon::GroupDestroy(_)
635                        | GroupDeltaCommon::GroupMerge(_)
636                        | GroupDeltaCommon::TruncateTables(_) => None,
637                    };
638                    sst_slice.into_iter().flatten()
639                })
640            })
641            .chain(
642                may_table_change_delta
643                    .map(|v| {
644                        v.flat_map(|delta| {
645                            let new_log = &delta.new_log;
646                            new_log.new_value.iter().chain(new_log.old_value.iter())
647                        })
648                    })
649                    .into_iter()
650                    .flatten(),
651            )
652    }
653}
654
655impl HummockVersionDelta {
656    #[expect(deprecated)]
657    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
658        self.max_committed_epoch
659    }
660}
661
662impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
663where
664    T: for<'a> From<&'a PbSstableInfo>,
665{
666    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
667        #[expect(deprecated)]
668        Self {
669            id: HummockVersionId(pb_version_delta.id),
670            prev_id: HummockVersionId(pb_version_delta.prev_id),
671            group_deltas: pb_version_delta
672                .group_deltas
673                .iter()
674                .map(|(group_id, deltas)| {
675                    (
676                        *group_id as CompactionGroupId,
677                        GroupDeltasCommon::from(deltas),
678                    )
679                })
680                .collect(),
681            max_committed_epoch: pb_version_delta.max_committed_epoch,
682            trivial_move: pb_version_delta.trivial_move,
683            new_table_watermarks: pb_version_delta
684                .new_table_watermarks
685                .iter()
686                .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
687                .collect(),
688            removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
689            change_log_delta: pb_version_delta
690                .change_log_delta
691                .iter()
692                .map(|(table_id, log_delta)| {
693                    (
694                        *table_id,
695                        ChangeLogDeltaCommon {
696                            truncate_epoch: log_delta.truncate_epoch,
697                            new_log: log_delta.new_log.as_ref().unwrap().into(),
698                        },
699                    )
700                })
701                .collect(),
702
703            state_table_info_delta: pb_version_delta
704                .state_table_info_delta
705                .iter()
706                .map(|(table_id, delta)| (*table_id, *delta))
707                .collect(),
708            vector_index_delta: pb_version_delta
709                .vector_index_delta
710                .iter()
711                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
712                .collect(),
713        }
714    }
715}
716
717impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
718where
719    PbSstableInfo: for<'a> From<&'a T>,
720{
721    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
722        #[expect(deprecated)]
723        Self {
724            id: version_delta.id.0,
725            prev_id: version_delta.prev_id.0,
726            group_deltas: version_delta
727                .group_deltas
728                .iter()
729                .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
730                .collect(),
731            max_committed_epoch: version_delta.max_committed_epoch,
732            trivial_move: version_delta.trivial_move,
733            new_table_watermarks: version_delta
734                .new_table_watermarks
735                .iter()
736                .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
737                .collect(),
738            removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
739            change_log_delta: version_delta
740                .change_log_delta
741                .iter()
742                .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
743                .collect(),
744            state_table_info_delta: version_delta.state_table_info_delta.clone(),
745            vector_index_delta: version_delta
746                .vector_index_delta
747                .iter()
748                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
749                .collect(),
750        }
751    }
752}
753
754impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
755where
756    PbSstableInfo: From<T>,
757{
758    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
759        #[expect(deprecated)]
760        Self {
761            id: version_delta.id.0,
762            prev_id: version_delta.prev_id.0,
763            group_deltas: version_delta
764                .group_deltas
765                .into_iter()
766                .map(|(group_id, deltas)| (group_id as _, deltas.into()))
767                .collect(),
768            max_committed_epoch: version_delta.max_committed_epoch,
769            trivial_move: version_delta.trivial_move,
770            new_table_watermarks: version_delta
771                .new_table_watermarks
772                .into_iter()
773                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
774                .collect(),
775            removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
776            change_log_delta: version_delta
777                .change_log_delta
778                .into_iter()
779                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
780                .collect(),
781            state_table_info_delta: version_delta.state_table_info_delta.clone(),
782            vector_index_delta: version_delta
783                .vector_index_delta
784                .into_iter()
785                .map(|(table_id, delta)| (table_id, delta.into()))
786                .collect(),
787        }
788    }
789}
790
791impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
792where
793    T: From<PbSstableInfo>,
794{
795    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
796        #[expect(deprecated)]
797        Self {
798            id: HummockVersionId(pb_version_delta.id),
799            prev_id: HummockVersionId(pb_version_delta.prev_id),
800            group_deltas: pb_version_delta
801                .group_deltas
802                .into_iter()
803                .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
804                .collect(),
805            max_committed_epoch: pb_version_delta.max_committed_epoch,
806            trivial_move: pb_version_delta.trivial_move,
807            new_table_watermarks: pb_version_delta
808                .new_table_watermarks
809                .into_iter()
810                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
811                .collect(),
812            removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
813            change_log_delta: pb_version_delta
814                .change_log_delta
815                .iter()
816                .map(|(table_id, log_delta)| {
817                    (
818                        *table_id,
819                        ChangeLogDeltaCommon {
820                            new_log: log_delta.new_log.clone().unwrap().into(),
821                            truncate_epoch: log_delta.truncate_epoch,
822                        },
823                    )
824                })
825                .collect(),
826            state_table_info_delta: pb_version_delta
827                .state_table_info_delta
828                .iter()
829                .map(|(table_id, delta)| (*table_id, *delta))
830                .collect(),
831            vector_index_delta: pb_version_delta
832                .vector_index_delta
833                .into_iter()
834                .map(|(table_id, delta)| (table_id, delta.into()))
835                .collect(),
836        }
837    }
838}
839
840#[derive(Debug, PartialEq, Clone)]
841pub struct IntraLevelDeltaCommon<T> {
842    pub level_idx: u32,
843    pub l0_sub_level_id: u64,
844    pub removed_table_ids: HashSet<HummockSstableId>,
845    pub inserted_table_infos: Vec<T>,
846    pub vnode_partition_count: u32,
847    pub compaction_group_version_id: u64,
848}
849
850pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
851
852impl IntraLevelDelta {
853    pub fn estimated_encode_len(&self) -> usize {
854        size_of::<u32>()
855            + size_of::<u64>()
856            + self.removed_table_ids.len() * size_of::<u32>()
857            + self
858                .inserted_table_infos
859                .iter()
860                .map(|sst| sst.estimated_encode_len())
861                .sum::<usize>()
862            + size_of::<u32>()
863    }
864}
865
866impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
867where
868    T: From<PbSstableInfo>,
869{
870    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
871        Self {
872            level_idx: pb_intra_level_delta.level_idx,
873            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
874            removed_table_ids: HashSet::from_iter(
875                pb_intra_level_delta
876                    .removed_table_ids
877                    .iter()
878                    .map(|sst_id| (*sst_id).into()),
879            ),
880            inserted_table_infos: pb_intra_level_delta
881                .inserted_table_infos
882                .into_iter()
883                .map(Into::into)
884                .collect_vec(),
885            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
886            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
887        }
888    }
889}
890
891impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
892where
893    PbSstableInfo: From<T>,
894{
895    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
896        Self {
897            level_idx: intra_level_delta.level_idx,
898            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
899            removed_table_ids: intra_level_delta
900                .removed_table_ids
901                .into_iter()
902                .map(|sst_id| sst_id.inner())
903                .collect(),
904            inserted_table_infos: intra_level_delta
905                .inserted_table_infos
906                .into_iter()
907                .map(Into::into)
908                .collect_vec(),
909            vnode_partition_count: intra_level_delta.vnode_partition_count,
910            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
911        }
912    }
913}
914
915impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
916where
917    PbSstableInfo: for<'a> From<&'a T>,
918{
919    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
920        Self {
921            level_idx: intra_level_delta.level_idx,
922            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
923            removed_table_ids: intra_level_delta
924                .removed_table_ids
925                .iter()
926                .map(|sst_id| sst_id.inner())
927                .collect(),
928            inserted_table_infos: intra_level_delta
929                .inserted_table_infos
930                .iter()
931                .map(Into::into)
932                .collect_vec(),
933            vnode_partition_count: intra_level_delta.vnode_partition_count,
934            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
935        }
936    }
937}
938
939impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
940where
941    T: for<'a> From<&'a PbSstableInfo>,
942{
943    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
944        Self {
945            level_idx: pb_intra_level_delta.level_idx,
946            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
947            removed_table_ids: HashSet::from_iter(
948                pb_intra_level_delta
949                    .removed_table_ids
950                    .iter()
951                    .map(|sst_id| (*sst_id).into()),
952            ),
953            inserted_table_infos: pb_intra_level_delta
954                .inserted_table_infos
955                .iter()
956                .map(Into::into)
957                .collect_vec(),
958            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
959            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
960        }
961    }
962}
963
964impl IntraLevelDelta {
965    pub fn new(
966        level_idx: u32,
967        l0_sub_level_id: u64,
968        removed_table_ids: HashSet<HummockSstableId>,
969        inserted_table_infos: Vec<SstableInfo>,
970        vnode_partition_count: u32,
971        compaction_group_version_id: u64,
972    ) -> Self {
973        Self {
974            level_idx,
975            l0_sub_level_id,
976            removed_table_ids,
977            inserted_table_infos,
978            vnode_partition_count,
979            compaction_group_version_id,
980        }
981    }
982}
983
984#[derive(Debug, PartialEq, Clone)]
985pub enum GroupDeltaCommon<T> {
986    NewL0SubLevel(Vec<T>),
987    IntraLevel(IntraLevelDeltaCommon<T>),
988    GroupConstruct(Box<PbGroupConstruct>),
989    GroupDestroy(PbGroupDestroy),
990    GroupMerge(PbGroupMerge),
991    TruncateTables(HashSet<TableId>),
992}
993
994pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
995
996impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
997where
998    T: From<PbSstableInfo>,
999{
1000    fn from(pb_group_delta: PbGroupDelta) -> Self {
1001        match pb_group_delta.delta_type {
1002            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1003                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1004            }
1005            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1006                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1007            }
1008            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1009                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1010            }
1011            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1012                GroupDeltaCommon::GroupMerge(pb_group_merge)
1013            }
1014            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1015                pb_new_sub_level
1016                    .inserted_table_infos
1017                    .into_iter()
1018                    .map(T::from)
1019                    .collect(),
1020            ),
1021            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1022                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1023            }
1024
1025            None => panic!("delta_type is not set"),
1026        }
1027    }
1028}
1029
1030impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1031where
1032    PbSstableInfo: From<T>,
1033{
1034    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1035        match group_delta {
1036            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1037                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1038            },
1039            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1040                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1041            },
1042            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1043                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1044            },
1045            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1046                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1047            },
1048            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1049                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1050                    inserted_table_infos: new_sub_level
1051                        .into_iter()
1052                        .map(PbSstableInfo::from)
1053                        .collect(),
1054                })),
1055            },
1056            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1057                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1058                    table_ids: table_ids.iter().copied().collect(),
1059                })),
1060            },
1061        }
1062    }
1063}
1064
1065impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1066where
1067    PbSstableInfo: for<'a> From<&'a T>,
1068{
1069    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1070        match group_delta {
1071            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1072                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1073            },
1074            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1075                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1076            },
1077            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1078                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1079            },
1080            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1081                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1082            },
1083            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1084                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1085                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1086                })),
1087            },
1088            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1089                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1090                    table_ids: table_ids.iter().copied().collect(),
1091                })),
1092            },
1093        }
1094    }
1095}
1096
1097impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1098where
1099    T: for<'a> From<&'a PbSstableInfo>,
1100{
1101    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1102        match &pb_group_delta.delta_type {
1103            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1104                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1105            }
1106            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1107                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1108            }
1109            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1110                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1111            }
1112            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1113                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1114            }
1115            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1116                pb_new_sub_level
1117                    .inserted_table_infos
1118                    .iter()
1119                    .map(T::from)
1120                    .collect(),
1121            ),
1122            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1123                GroupDeltaCommon::TruncateTables(
1124                    pb_truncate_tables.table_ids.iter().copied().collect(),
1125                )
1126            }
1127            None => panic!("delta_type is not set"),
1128        }
1129    }
1130}
1131
1132#[derive(Debug, PartialEq, Clone)]
1133pub struct GroupDeltasCommon<T> {
1134    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1135}
1136
1137impl<T> Default for GroupDeltasCommon<T> {
1138    fn default() -> Self {
1139        Self {
1140            group_deltas: vec![],
1141        }
1142    }
1143}
1144
1145pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1146
1147impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1148where
1149    T: From<PbSstableInfo>,
1150{
1151    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1152        Self {
1153            group_deltas: pb_group_deltas
1154                .group_deltas
1155                .into_iter()
1156                .map(GroupDeltaCommon::from)
1157                .collect_vec(),
1158        }
1159    }
1160}
1161
1162impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1163where
1164    PbSstableInfo: From<T>,
1165{
1166    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1167        Self {
1168            group_deltas: group_deltas
1169                .group_deltas
1170                .into_iter()
1171                .map(|group_delta| group_delta.into())
1172                .collect_vec(),
1173        }
1174    }
1175}
1176
1177impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1178where
1179    PbSstableInfo: for<'a> From<&'a T>,
1180{
1181    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1182        Self {
1183            group_deltas: group_deltas
1184                .group_deltas
1185                .iter()
1186                .map(|group_delta| group_delta.into())
1187                .collect_vec(),
1188        }
1189    }
1190}
1191
1192impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1193where
1194    T: for<'a> From<&'a PbSstableInfo>,
1195{
1196    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1197        Self {
1198            group_deltas: pb_group_deltas
1199                .group_deltas
1200                .iter()
1201                .map(GroupDeltaCommon::from)
1202                .collect_vec(),
1203        }
1204    }
1205}
1206
1207impl<T> GroupDeltasCommon<T>
1208where
1209    PbSstableInfo: for<'a> From<&'a T>,
1210{
1211    pub fn to_protobuf(&self) -> PbGroupDeltas {
1212        self.into()
1213    }
1214}
1215
1216impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1217    #[expect(deprecated)]
1218    fn from(delta: HummockVersionDelta) -> Self {
1219        Self {
1220            id: delta.id,
1221            prev_id: delta.prev_id,
1222            group_deltas: delta.group_deltas,
1223            max_committed_epoch: delta.max_committed_epoch,
1224            trivial_move: delta.trivial_move,
1225            new_table_watermarks: delta.new_table_watermarks,
1226            removed_table_ids: delta.removed_table_ids,
1227            change_log_delta: delta
1228                .change_log_delta
1229                .into_iter()
1230                .map(|(k, v)| {
1231                    (
1232                        k,
1233                        ChangeLogDeltaCommon {
1234                            truncate_epoch: v.truncate_epoch,
1235                            new_log: EpochNewChangeLogCommon {
1236                                new_value: Vec::new(),
1237                                old_value: Vec::new(),
1238                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1239                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1240                            },
1241                        },
1242                    )
1243                })
1244                .collect(),
1245            state_table_info_delta: delta.state_table_info_delta,
1246            vector_index_delta: delta.vector_index_delta,
1247        }
1248    }
1249}
1250
1251impl From<HummockVersion> for LocalHummockVersion {
1252    #[expect(deprecated)]
1253    fn from(version: HummockVersion) -> Self {
1254        Self {
1255            id: version.id,
1256            levels: version.levels,
1257            max_committed_epoch: version.max_committed_epoch,
1258            table_watermarks: version.table_watermarks,
1259            table_change_log: version
1260                .table_change_log
1261                .into_iter()
1262                .map(|(k, v)| {
1263                    let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1264                        .change_log_into_iter()
1265                        .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1266                            new_value: Vec::new(),
1267                            old_value: Vec::new(),
1268                            non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1269                            checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1270                        })
1271                        .collect();
1272                    (k, TableChangeLogCommon::new(epoch_new_change_logs))
1273                })
1274                .collect(),
1275            state_table_info: version.state_table_info,
1276            vector_indexes: version.vector_indexes,
1277        }
1278    }
1279}