risingwave_hummock_sdk/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::{
27    CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge,
28    PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo,
29    PbStateTableInfo, StateTableInfo, StateTableInfoDelta,
30};
31use tracing::warn;
32
33use crate::change_log::{
34    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
35};
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
38use crate::level::LevelsCommon;
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::TableWatermarks;
41use crate::{
42    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
43    HummockSstableObjectId, HummockVersionId,
44};
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct HummockVersionStateTableInfo {
48    state_table_info: HashMap<TableId, PbStateTableInfo>,
49
50    // in memory index
51    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
52}
53
54impl HummockVersionStateTableInfo {
55    pub fn empty() -> Self {
56        Self {
57            state_table_info: HashMap::new(),
58            compaction_group_member_tables: HashMap::new(),
59        }
60    }
61
62    fn build_compaction_group_member_tables(
63        state_table_info: &HashMap<TableId, PbStateTableInfo>,
64    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
65        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
66        for (table_id, info) in state_table_info {
67            assert!(
68                ret.entry(info.compaction_group_id)
69                    .or_default()
70                    .insert(*table_id)
71            );
72        }
73        ret
74    }
75
76    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
77        self.state_table_info
78            .iter()
79            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
80            .collect()
81    }
82
83    pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
84        let state_table_info = state_table_info
85            .iter()
86            .map(|(table_id, info)| (TableId::new(*table_id), *info))
87            .collect();
88        let compaction_group_member_tables =
89            Self::build_compaction_group_member_tables(&state_table_info);
90        Self {
91            state_table_info,
92            compaction_group_member_tables,
93        }
94    }
95
96    pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
97        self.state_table_info
98            .iter()
99            .map(|(table_id, info)| (table_id.table_id, *info))
100            .collect()
101    }
102
103    pub fn apply_delta(
104        &mut self,
105        delta: &HashMap<TableId, StateTableInfoDelta>,
106        removed_table_id: &HashSet<TableId>,
107    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
108        let mut changed_table = HashMap::new();
109        let mut has_bumped_committed_epoch = false;
110        fn remove_table_from_compaction_group(
111            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
112            compaction_group_id: CompactionGroupId,
113            table_id: TableId,
114        ) {
115            let member_tables = compaction_group_member_tables
116                .get_mut(&compaction_group_id)
117                .expect("should exist");
118            assert!(member_tables.remove(&table_id));
119            if member_tables.is_empty() {
120                assert!(
121                    compaction_group_member_tables
122                        .remove(&compaction_group_id)
123                        .is_some()
124                );
125            }
126        }
127        for table_id in removed_table_id {
128            if let Some(prev_info) = self.state_table_info.remove(table_id) {
129                remove_table_from_compaction_group(
130                    &mut self.compaction_group_member_tables,
131                    prev_info.compaction_group_id,
132                    *table_id,
133                );
134                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
135            } else {
136                warn!(
137                    table_id = table_id.table_id,
138                    "table to remove does not exist"
139                );
140            }
141        }
142        for (table_id, delta) in delta {
143            if removed_table_id.contains(table_id) {
144                continue;
145            }
146            let new_info = StateTableInfo {
147                committed_epoch: delta.committed_epoch,
148                compaction_group_id: delta.compaction_group_id,
149            };
150            match self.state_table_info.entry(*table_id) {
151                Entry::Occupied(mut entry) => {
152                    let prev_info = entry.get_mut();
153                    assert!(
154                        new_info.committed_epoch >= prev_info.committed_epoch,
155                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
156                        table_id.table_id,
157                        prev_info,
158                        new_info
159                    );
160                    if new_info.committed_epoch > prev_info.committed_epoch {
161                        has_bumped_committed_epoch = true;
162                    }
163                    if prev_info.compaction_group_id != new_info.compaction_group_id {
164                        // table moved to another compaction group
165                        remove_table_from_compaction_group(
166                            &mut self.compaction_group_member_tables,
167                            prev_info.compaction_group_id,
168                            *table_id,
169                        );
170                        assert!(
171                            self.compaction_group_member_tables
172                                .entry(new_info.compaction_group_id)
173                                .or_default()
174                                .insert(*table_id)
175                        );
176                    }
177                    let prev_info = replace(prev_info, new_info);
178                    changed_table.insert(*table_id, Some(prev_info));
179                }
180                Entry::Vacant(entry) => {
181                    assert!(
182                        self.compaction_group_member_tables
183                            .entry(new_info.compaction_group_id)
184                            .or_default()
185                            .insert(*table_id)
186                    );
187                    has_bumped_committed_epoch = true;
188                    entry.insert(new_info);
189                    changed_table.insert(*table_id, None);
190                }
191            }
192        }
193        debug_assert_eq!(
194            self.compaction_group_member_tables,
195            Self::build_compaction_group_member_tables(&self.state_table_info)
196        );
197        (changed_table, has_bumped_committed_epoch)
198    }
199
200    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
201        &self.state_table_info
202    }
203
204    pub fn compaction_group_member_table_ids(
205        &self,
206        compaction_group_id: CompactionGroupId,
207    ) -> &BTreeSet<TableId> {
208        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
209        self.compaction_group_member_tables
210            .get(&compaction_group_id)
211            .unwrap_or_else(|| EMPTY_SET.deref())
212    }
213
214    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
215        &self.compaction_group_member_tables
216    }
217
218    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
219        self.state_table_info
220            .values()
221            .map(|info| info.committed_epoch)
222            .max()
223    }
224}
225
226#[derive(Debug, Clone, PartialEq)]
227pub struct HummockVersionCommon<T, L = T> {
228    pub id: HummockVersionId,
229    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
230    #[deprecated]
231    pub(crate) max_committed_epoch: u64,
232    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
233    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
234    pub state_table_info: HummockVersionStateTableInfo,
235}
236
237pub type HummockVersion = HummockVersionCommon<SstableInfo>;
238
239pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
240
241impl Default for HummockVersion {
242    fn default() -> Self {
243        HummockVersion::from(&PbHummockVersion::default())
244    }
245}
246
247impl<T> HummockVersionCommon<T>
248where
249    T: for<'a> From<&'a PbSstableInfo>,
250    PbSstableInfo: for<'a> From<&'a T>,
251{
252    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
253    /// maintain backward compatibility.
254    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
255        pb_version.into()
256    }
257
258    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
259    /// We should maintain backward compatibility.
260    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
261        pb_version.into()
262    }
263
264    pub fn to_protobuf(&self) -> PbHummockVersion {
265        self.into()
266    }
267}
268
269impl HummockVersion {
270    pub fn estimated_encode_len(&self) -> usize {
271        self.levels.len() * size_of::<CompactionGroupId>()
272            + self
273                .levels
274                .values()
275                .map(|level| level.estimated_encode_len())
276                .sum::<usize>()
277            + self.table_watermarks.len() * size_of::<u32>()
278            + self
279                .table_watermarks
280                .values()
281                .map(|table_watermark| table_watermark.estimated_encode_len())
282                .sum::<usize>()
283            + self
284                .table_change_log
285                .values()
286                .map(|c| {
287                    c.iter()
288                        .map(|l| {
289                            l.old_value
290                                .iter()
291                                .chain(l.new_value.iter())
292                                .map(|s| s.estimated_encode_len())
293                                .sum::<usize>()
294                        })
295                        .sum::<usize>()
296                })
297                .sum::<usize>()
298    }
299}
300
301impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
302where
303    T: for<'a> From<&'a PbSstableInfo>,
304{
305    fn from(pb_version: &PbHummockVersion) -> Self {
306        #[expect(deprecated)]
307        Self {
308            id: HummockVersionId(pb_version.id),
309            levels: pb_version
310                .levels
311                .iter()
312                .map(|(group_id, levels)| {
313                    (*group_id as CompactionGroupId, LevelsCommon::from(levels))
314                })
315                .collect(),
316            max_committed_epoch: pb_version.max_committed_epoch,
317            table_watermarks: pb_version
318                .table_watermarks
319                .iter()
320                .map(|(table_id, table_watermark)| {
321                    (
322                        TableId::new(*table_id),
323                        Arc::new(TableWatermarks::from(table_watermark)),
324                    )
325                })
326                .collect(),
327            table_change_log: pb_version
328                .table_change_logs
329                .iter()
330                .map(|(table_id, change_log)| {
331                    (
332                        TableId::new(*table_id),
333                        TableChangeLogCommon::from_protobuf(change_log),
334                    )
335                })
336                .collect(),
337            state_table_info: HummockVersionStateTableInfo::from_protobuf(
338                &pb_version.state_table_info,
339            ),
340        }
341    }
342}
343
344impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
345where
346    PbSstableInfo: for<'a> From<&'a T>,
347{
348    fn from(version: &HummockVersionCommon<T>) -> Self {
349        #[expect(deprecated)]
350        Self {
351            id: version.id.0,
352            levels: version
353                .levels
354                .iter()
355                .map(|(group_id, levels)| (*group_id as _, levels.into()))
356                .collect(),
357            max_committed_epoch: version.max_committed_epoch,
358            table_watermarks: version
359                .table_watermarks
360                .iter()
361                .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
362                .collect(),
363            table_change_logs: version
364                .table_change_log
365                .iter()
366                .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
367                .collect(),
368            state_table_info: version.state_table_info.to_protobuf(),
369        }
370    }
371}
372
373impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
374where
375    PbSstableInfo: From<T>,
376    PbSstableInfo: for<'a> From<&'a T>,
377{
378    fn from(version: HummockVersionCommon<T>) -> Self {
379        #[expect(deprecated)]
380        Self {
381            id: version.id.0,
382            levels: version
383                .levels
384                .into_iter()
385                .map(|(group_id, levels)| (group_id as _, levels.into()))
386                .collect(),
387            max_committed_epoch: version.max_committed_epoch,
388            table_watermarks: version
389                .table_watermarks
390                .into_iter()
391                .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
392                .collect(),
393            table_change_logs: version
394                .table_change_log
395                .into_iter()
396                .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
397                .collect(),
398            state_table_info: version.state_table_info.to_protobuf(),
399        }
400    }
401}
402
403impl HummockVersion {
404    pub fn next_version_id(&self) -> HummockVersionId {
405        self.id.next()
406    }
407
408    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409        // for backward-compatibility of previous hummock version delta
410        self.state_table_info.state_table_info.is_empty()
411            && self.levels.values().any(|group| {
412                // state_table_info is not previously filled, but there previously exists some tables
413                #[expect(deprecated)]
414                !group.member_table_ids.is_empty()
415            })
416    }
417
418    pub fn may_fill_backward_compatible_state_table_info_delta(
419        &self,
420        delta: &mut HummockVersionDelta,
421    ) {
422        #[expect(deprecated)]
423        // for backward-compatibility of previous hummock version delta
424        for (cg_id, group) in &self.levels {
425            for table_id in &group.member_table_ids {
426                assert!(
427                    delta
428                        .state_table_info_delta
429                        .insert(
430                            TableId::new(*table_id),
431                            StateTableInfoDelta {
432                                committed_epoch: self.max_committed_epoch,
433                                compaction_group_id: *cg_id,
434                            }
435                        )
436                        .is_none(),
437                    "duplicate table id {} in cg {}",
438                    table_id,
439                    cg_id
440                );
441            }
442        }
443    }
444
445    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446        #[expect(deprecated)]
447        let mut init_version = HummockVersion {
448            id: FIRST_VERSION_ID,
449            levels: Default::default(),
450            max_committed_epoch: INVALID_EPOCH,
451            table_watermarks: HashMap::new(),
452            table_change_log: HashMap::new(),
453            state_table_info: HummockVersionStateTableInfo::empty(),
454        };
455        for group_id in [
456            StaticCompactionGroupId::StateDefault as CompactionGroupId,
457            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
458        ] {
459            init_version.levels.insert(
460                group_id,
461                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
462            );
463        }
464        init_version
465    }
466
467    pub fn version_delta_after(&self) -> HummockVersionDelta {
468        #[expect(deprecated)]
469        HummockVersionDelta {
470            id: self.next_version_id(),
471            prev_id: self.id,
472            trivial_move: false,
473            max_committed_epoch: self.max_committed_epoch,
474            group_deltas: Default::default(),
475            new_table_watermarks: HashMap::new(),
476            removed_table_ids: HashSet::new(),
477            change_log_delta: HashMap::new(),
478            state_table_info_delta: Default::default(),
479        }
480    }
481
482    pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
483        let table_change_log = {
484            let mut table_change_log = HashMap::new();
485            for (table_id, log) in &mut self.table_change_log {
486                let change_log_iter =
487                    log.change_log_iter_mut()
488                        .map(|item| EpochNewChangeLogCommon {
489                            new_value: std::mem::take(&mut item.new_value),
490                            old_value: std::mem::take(&mut item.old_value),
491                            non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
492                            checkpoint_epoch: item.checkpoint_epoch,
493                        });
494                table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
495            }
496
497            table_change_log
498        };
499
500        let local_version = LocalHummockVersion::from(self);
501
502        (local_version, table_change_log)
503    }
504}
505
506impl<T, L> HummockVersionCommon<T, L> {
507    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
508        self.state_table_info
509            .info()
510            .get(&table_id)
511            .map(|info| info.committed_epoch)
512    }
513}
514
515#[derive(Debug, PartialEq, Clone)]
516pub struct HummockVersionDeltaCommon<T, L = T> {
517    pub id: HummockVersionId,
518    pub prev_id: HummockVersionId,
519    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
520    #[deprecated]
521    pub(crate) max_committed_epoch: u64,
522    pub trivial_move: bool,
523    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
524    pub removed_table_ids: HashSet<TableId>,
525    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
526    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
527}
528
529pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
530
531pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
532
533impl Default for HummockVersionDelta {
534    fn default() -> Self {
535        HummockVersionDelta::from(&PbHummockVersionDelta::default())
536    }
537}
538
539impl<T> HummockVersionDeltaCommon<T>
540where
541    T: for<'a> From<&'a PbSstableInfo>,
542    PbSstableInfo: for<'a> From<&'a T>,
543{
544    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
545    /// We should maintain backward compatibility.
546    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
547        delta.into()
548    }
549
550    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
551    /// maintain backward compatibility.
552    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
553        delta.into()
554    }
555
556    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
557        self.into()
558    }
559}
560
561pub trait SstableIdReader {
562    fn sst_id(&self) -> HummockSstableId;
563}
564
565pub trait ObjectIdReader {
566    fn object_id(&self) -> HummockSstableObjectId;
567}
568
569impl<T> HummockVersionDeltaCommon<T>
570where
571    T: SstableIdReader + ObjectIdReader,
572{
573    /// Get the newly added object ids from the version delta.
574    ///
575    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
576    /// but it is possible that the object is moved or split from other compaction groups or levels.
577    pub fn newly_added_object_ids(
578        &self,
579        exclude_table_change_log: bool,
580    ) -> HashSet<HummockObjectId> {
581        // DO NOT REMOVE THIS LINE
582        // This is to ensure that when adding new variant to `HummockObjectId`,
583        // the compiler will warn us if we forget to handle it here.
584        match HummockObjectId::Sstable(0.into()) {
585            HummockObjectId::Sstable(_) => {}
586        };
587        self.newly_added_sst_infos(exclude_table_change_log)
588            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
589            .collect()
590    }
591
592    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
593        self.newly_added_sst_infos(exclude_table_change_log)
594            .map(|sst| sst.sst_id())
595            .collect()
596    }
597}
598
599impl<T> HummockVersionDeltaCommon<T> {
600    pub fn newly_added_sst_infos(
601        &self,
602        exclude_table_change_log: bool,
603    ) -> impl Iterator<Item = &'_ T> {
604        let may_table_change_delta = if exclude_table_change_log {
605            None
606        } else {
607            Some(self.change_log_delta.values())
608        };
609        self.group_deltas
610            .values()
611            .flat_map(|group_deltas| {
612                group_deltas.group_deltas.iter().flat_map(|group_delta| {
613                    let sst_slice = match &group_delta {
614                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
615                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
616                            inserted_table_infos,
617                            ..
618                        }) => Some(inserted_table_infos.iter()),
619                        GroupDeltaCommon::GroupConstruct(_)
620                        | GroupDeltaCommon::GroupDestroy(_)
621                        | GroupDeltaCommon::GroupMerge(_) => None,
622                    };
623                    sst_slice.into_iter().flatten()
624                })
625            })
626            .chain(
627                may_table_change_delta
628                    .map(|v| {
629                        v.flat_map(|delta| {
630                            let new_log = &delta.new_log;
631                            new_log.new_value.iter().chain(new_log.old_value.iter())
632                        })
633                    })
634                    .into_iter()
635                    .flatten(),
636            )
637    }
638}
639
640impl HummockVersionDelta {
641    #[expect(deprecated)]
642    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
643        self.max_committed_epoch
644    }
645}
646
647impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
648where
649    T: for<'a> From<&'a PbSstableInfo>,
650{
651    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
652        #[expect(deprecated)]
653        Self {
654            id: HummockVersionId(pb_version_delta.id),
655            prev_id: HummockVersionId(pb_version_delta.prev_id),
656            group_deltas: pb_version_delta
657                .group_deltas
658                .iter()
659                .map(|(group_id, deltas)| {
660                    (
661                        *group_id as CompactionGroupId,
662                        GroupDeltasCommon::from(deltas),
663                    )
664                })
665                .collect(),
666            max_committed_epoch: pb_version_delta.max_committed_epoch,
667            trivial_move: pb_version_delta.trivial_move,
668            new_table_watermarks: pb_version_delta
669                .new_table_watermarks
670                .iter()
671                .map(|(table_id, watermarks)| {
672                    (TableId::new(*table_id), TableWatermarks::from(watermarks))
673                })
674                .collect(),
675            removed_table_ids: pb_version_delta
676                .removed_table_ids
677                .iter()
678                .map(|table_id| TableId::new(*table_id))
679                .collect(),
680            change_log_delta: pb_version_delta
681                .change_log_delta
682                .iter()
683                .map(|(table_id, log_delta)| {
684                    (
685                        TableId::new(*table_id),
686                        ChangeLogDeltaCommon {
687                            truncate_epoch: log_delta.truncate_epoch,
688                            new_log: log_delta.new_log.as_ref().unwrap().into(),
689                        },
690                    )
691                })
692                .collect(),
693
694            state_table_info_delta: pb_version_delta
695                .state_table_info_delta
696                .iter()
697                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
698                .collect(),
699        }
700    }
701}
702
703impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
704where
705    PbSstableInfo: for<'a> From<&'a T>,
706{
707    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
708        #[expect(deprecated)]
709        Self {
710            id: version_delta.id.0,
711            prev_id: version_delta.prev_id.0,
712            group_deltas: version_delta
713                .group_deltas
714                .iter()
715                .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
716                .collect(),
717            max_committed_epoch: version_delta.max_committed_epoch,
718            trivial_move: version_delta.trivial_move,
719            new_table_watermarks: version_delta
720                .new_table_watermarks
721                .iter()
722                .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
723                .collect(),
724            removed_table_ids: version_delta
725                .removed_table_ids
726                .iter()
727                .map(|table_id| table_id.table_id)
728                .collect(),
729            change_log_delta: version_delta
730                .change_log_delta
731                .iter()
732                .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
733                .collect(),
734            state_table_info_delta: version_delta
735                .state_table_info_delta
736                .iter()
737                .map(|(table_id, delta)| (table_id.table_id, *delta))
738                .collect(),
739        }
740    }
741}
742
743impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
744where
745    PbSstableInfo: From<T>,
746{
747    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
748        #[expect(deprecated)]
749        Self {
750            id: version_delta.id.0,
751            prev_id: version_delta.prev_id.0,
752            group_deltas: version_delta
753                .group_deltas
754                .into_iter()
755                .map(|(group_id, deltas)| (group_id as _, deltas.into()))
756                .collect(),
757            max_committed_epoch: version_delta.max_committed_epoch,
758            trivial_move: version_delta.trivial_move,
759            new_table_watermarks: version_delta
760                .new_table_watermarks
761                .into_iter()
762                .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
763                .collect(),
764            removed_table_ids: version_delta
765                .removed_table_ids
766                .into_iter()
767                .map(|table_id| table_id.table_id)
768                .collect(),
769            change_log_delta: version_delta
770                .change_log_delta
771                .into_iter()
772                .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
773                .collect(),
774            state_table_info_delta: version_delta
775                .state_table_info_delta
776                .into_iter()
777                .map(|(table_id, delta)| (table_id.table_id, delta))
778                .collect(),
779        }
780    }
781}
782
783impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
784where
785    T: From<PbSstableInfo>,
786{
787    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
788        #[expect(deprecated)]
789        Self {
790            id: HummockVersionId(pb_version_delta.id),
791            prev_id: HummockVersionId(pb_version_delta.prev_id),
792            group_deltas: pb_version_delta
793                .group_deltas
794                .into_iter()
795                .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
796                .collect(),
797            max_committed_epoch: pb_version_delta.max_committed_epoch,
798            trivial_move: pb_version_delta.trivial_move,
799            new_table_watermarks: pb_version_delta
800                .new_table_watermarks
801                .into_iter()
802                .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
803                .collect(),
804            removed_table_ids: pb_version_delta
805                .removed_table_ids
806                .into_iter()
807                .map(TableId::new)
808                .collect(),
809            change_log_delta: pb_version_delta
810                .change_log_delta
811                .iter()
812                .map(|(table_id, log_delta)| {
813                    (
814                        TableId::new(*table_id),
815                        ChangeLogDeltaCommon {
816                            new_log: log_delta.new_log.clone().unwrap().into(),
817                            truncate_epoch: log_delta.truncate_epoch,
818                        },
819                    )
820                })
821                .collect(),
822            state_table_info_delta: pb_version_delta
823                .state_table_info_delta
824                .iter()
825                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
826                .collect(),
827        }
828    }
829}
830
831#[derive(Debug, PartialEq, Clone)]
832pub struct IntraLevelDeltaCommon<T> {
833    pub level_idx: u32,
834    pub l0_sub_level_id: u64,
835    pub removed_table_ids: HashSet<HummockSstableId>,
836    pub inserted_table_infos: Vec<T>,
837    pub vnode_partition_count: u32,
838    pub compaction_group_version_id: u64,
839}
840
841pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
842
843impl IntraLevelDelta {
844    pub fn estimated_encode_len(&self) -> usize {
845        size_of::<u32>()
846            + size_of::<u64>()
847            + self.removed_table_ids.len() * size_of::<u32>()
848            + self
849                .inserted_table_infos
850                .iter()
851                .map(|sst| sst.estimated_encode_len())
852                .sum::<usize>()
853            + size_of::<u32>()
854    }
855}
856
857impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
858where
859    T: From<PbSstableInfo>,
860{
861    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
862        Self {
863            level_idx: pb_intra_level_delta.level_idx,
864            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
865            removed_table_ids: HashSet::from_iter(
866                pb_intra_level_delta
867                    .removed_table_ids
868                    .iter()
869                    .map(|sst_id| (*sst_id).into()),
870            ),
871            inserted_table_infos: pb_intra_level_delta
872                .inserted_table_infos
873                .into_iter()
874                .map(Into::into)
875                .collect_vec(),
876            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
877            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
878        }
879    }
880}
881
882impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
883where
884    PbSstableInfo: From<T>,
885{
886    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
887        Self {
888            level_idx: intra_level_delta.level_idx,
889            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
890            removed_table_ids: intra_level_delta
891                .removed_table_ids
892                .into_iter()
893                .map(|sst_id| sst_id.inner())
894                .collect(),
895            inserted_table_infos: intra_level_delta
896                .inserted_table_infos
897                .into_iter()
898                .map(Into::into)
899                .collect_vec(),
900            vnode_partition_count: intra_level_delta.vnode_partition_count,
901            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
902        }
903    }
904}
905
906impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
907where
908    PbSstableInfo: for<'a> From<&'a T>,
909{
910    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
911        Self {
912            level_idx: intra_level_delta.level_idx,
913            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
914            removed_table_ids: intra_level_delta
915                .removed_table_ids
916                .iter()
917                .map(|sst_id| sst_id.inner())
918                .collect(),
919            inserted_table_infos: intra_level_delta
920                .inserted_table_infos
921                .iter()
922                .map(Into::into)
923                .collect_vec(),
924            vnode_partition_count: intra_level_delta.vnode_partition_count,
925            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
926        }
927    }
928}
929
930impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
931where
932    T: for<'a> From<&'a PbSstableInfo>,
933{
934    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
935        Self {
936            level_idx: pb_intra_level_delta.level_idx,
937            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
938            removed_table_ids: HashSet::from_iter(
939                pb_intra_level_delta
940                    .removed_table_ids
941                    .iter()
942                    .map(|sst_id| (*sst_id).into()),
943            ),
944            inserted_table_infos: pb_intra_level_delta
945                .inserted_table_infos
946                .iter()
947                .map(Into::into)
948                .collect_vec(),
949            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
950            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
951        }
952    }
953}
954
955impl IntraLevelDelta {
956    pub fn new(
957        level_idx: u32,
958        l0_sub_level_id: u64,
959        removed_table_ids: HashSet<HummockSstableId>,
960        inserted_table_infos: Vec<SstableInfo>,
961        vnode_partition_count: u32,
962        compaction_group_version_id: u64,
963    ) -> Self {
964        Self {
965            level_idx,
966            l0_sub_level_id,
967            removed_table_ids,
968            inserted_table_infos,
969            vnode_partition_count,
970            compaction_group_version_id,
971        }
972    }
973}
974
975#[derive(Debug, PartialEq, Clone)]
976pub enum GroupDeltaCommon<T> {
977    NewL0SubLevel(Vec<T>),
978    IntraLevel(IntraLevelDeltaCommon<T>),
979    GroupConstruct(Box<PbGroupConstruct>),
980    GroupDestroy(PbGroupDestroy),
981    GroupMerge(PbGroupMerge),
982}
983
984pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
985
986impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
987where
988    T: From<PbSstableInfo>,
989{
990    fn from(pb_group_delta: PbGroupDelta) -> Self {
991        match pb_group_delta.delta_type {
992            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
993                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
994            }
995            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
996                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
997            }
998            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
999                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1000            }
1001            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1002                GroupDeltaCommon::GroupMerge(pb_group_merge)
1003            }
1004            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1005                pb_new_sub_level
1006                    .inserted_table_infos
1007                    .into_iter()
1008                    .map(T::from)
1009                    .collect(),
1010            ),
1011            None => panic!("delta_type is not set"),
1012        }
1013    }
1014}
1015
1016impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1017where
1018    PbSstableInfo: From<T>,
1019{
1020    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1021        match group_delta {
1022            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1023                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1024            },
1025            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1026                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1027            },
1028            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1029                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1030            },
1031            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1032                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1033            },
1034            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1035                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1036                    inserted_table_infos: new_sub_level
1037                        .into_iter()
1038                        .map(PbSstableInfo::from)
1039                        .collect(),
1040                })),
1041            },
1042        }
1043    }
1044}
1045
1046impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1047where
1048    PbSstableInfo: for<'a> From<&'a T>,
1049{
1050    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1051        match group_delta {
1052            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1053                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1054            },
1055            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1056                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1057            },
1058            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1059                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1060            },
1061            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1062                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1063            },
1064            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1065                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1066                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1067                })),
1068            },
1069        }
1070    }
1071}
1072
1073impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1074where
1075    T: for<'a> From<&'a PbSstableInfo>,
1076{
1077    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1078        match &pb_group_delta.delta_type {
1079            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1080                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1081            }
1082            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1083                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1084            }
1085            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1086                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1087            }
1088            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1089                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1090            }
1091            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1092                pb_new_sub_level
1093                    .inserted_table_infos
1094                    .iter()
1095                    .map(T::from)
1096                    .collect(),
1097            ),
1098            None => panic!("delta_type is not set"),
1099        }
1100    }
1101}
1102
1103#[derive(Debug, PartialEq, Clone)]
1104pub struct GroupDeltasCommon<T> {
1105    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1106}
1107
1108impl<T> Default for GroupDeltasCommon<T> {
1109    fn default() -> Self {
1110        Self {
1111            group_deltas: vec![],
1112        }
1113    }
1114}
1115
1116pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1117
1118impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1119where
1120    T: From<PbSstableInfo>,
1121{
1122    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1123        Self {
1124            group_deltas: pb_group_deltas
1125                .group_deltas
1126                .into_iter()
1127                .map(GroupDeltaCommon::from)
1128                .collect_vec(),
1129        }
1130    }
1131}
1132
1133impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1134where
1135    PbSstableInfo: From<T>,
1136{
1137    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1138        Self {
1139            group_deltas: group_deltas
1140                .group_deltas
1141                .into_iter()
1142                .map(|group_delta| group_delta.into())
1143                .collect_vec(),
1144        }
1145    }
1146}
1147
1148impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1149where
1150    PbSstableInfo: for<'a> From<&'a T>,
1151{
1152    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1153        Self {
1154            group_deltas: group_deltas
1155                .group_deltas
1156                .iter()
1157                .map(|group_delta| group_delta.into())
1158                .collect_vec(),
1159        }
1160    }
1161}
1162
1163impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1164where
1165    T: for<'a> From<&'a PbSstableInfo>,
1166{
1167    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1168        Self {
1169            group_deltas: pb_group_deltas
1170                .group_deltas
1171                .iter()
1172                .map(GroupDeltaCommon::from)
1173                .collect_vec(),
1174        }
1175    }
1176}
1177
1178impl<T> GroupDeltasCommon<T>
1179where
1180    PbSstableInfo: for<'a> From<&'a T>,
1181{
1182    pub fn to_protobuf(&self) -> PbGroupDeltas {
1183        self.into()
1184    }
1185}
1186
1187impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1188    #[expect(deprecated)]
1189    fn from(delta: HummockVersionDelta) -> Self {
1190        Self {
1191            id: delta.id,
1192            prev_id: delta.prev_id,
1193            group_deltas: delta.group_deltas,
1194            max_committed_epoch: delta.max_committed_epoch,
1195            trivial_move: delta.trivial_move,
1196            new_table_watermarks: delta.new_table_watermarks,
1197            removed_table_ids: delta.removed_table_ids,
1198            change_log_delta: delta
1199                .change_log_delta
1200                .into_iter()
1201                .map(|(k, v)| {
1202                    (
1203                        k,
1204                        ChangeLogDeltaCommon {
1205                            truncate_epoch: v.truncate_epoch,
1206                            new_log: EpochNewChangeLogCommon {
1207                                new_value: Vec::new(),
1208                                old_value: Vec::new(),
1209                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1210                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1211                            },
1212                        },
1213                    )
1214                })
1215                .collect(),
1216            state_table_info_delta: delta.state_table_info_delta,
1217        }
1218    }
1219}
1220
1221impl From<HummockVersion> for LocalHummockVersion {
1222    #[expect(deprecated)]
1223    fn from(version: HummockVersion) -> Self {
1224        Self {
1225            id: version.id,
1226            levels: version.levels,
1227            max_committed_epoch: version.max_committed_epoch,
1228            table_watermarks: version.table_watermarks,
1229            table_change_log: version
1230                .table_change_log
1231                .into_iter()
1232                .map(|(k, v)| {
1233                    let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1234                        .change_log_into_iter()
1235                        .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1236                            new_value: Vec::new(),
1237                            old_value: Vec::new(),
1238                            non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1239                            checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1240                        })
1241                        .collect();
1242                    (k, TableChangeLogCommon::new(epoch_new_change_logs))
1243                })
1244                .collect(),
1245            state_table_info: version.state_table_info,
1246        }
1247    }
1248}