risingwave_hummock_sdk/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40    HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45    state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47    // in memory index
48    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52    pub fn empty() -> Self {
53        Self {
54            state_table_info: HashMap::new(),
55            compaction_group_member_tables: HashMap::new(),
56        }
57    }
58
59    fn build_compaction_group_member_tables(
60        state_table_info: &HashMap<TableId, PbStateTableInfo>,
61    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63        for (table_id, info) in state_table_info {
64            assert!(
65                ret.entry(info.compaction_group_id)
66                    .or_default()
67                    .insert(*table_id)
68            );
69        }
70        ret
71    }
72
73    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74        self.state_table_info
75            .iter()
76            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77            .collect()
78    }
79
80    pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
81        let state_table_info = state_table_info
82            .iter()
83            .map(|(table_id, info)| (TableId::new(*table_id), *info))
84            .collect();
85        let compaction_group_member_tables =
86            Self::build_compaction_group_member_tables(&state_table_info);
87        Self {
88            state_table_info,
89            compaction_group_member_tables,
90        }
91    }
92
93    pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
94        self.state_table_info
95            .iter()
96            .map(|(table_id, info)| (table_id.table_id, *info))
97            .collect()
98    }
99
100    pub fn apply_delta(
101        &mut self,
102        delta: &HashMap<TableId, StateTableInfoDelta>,
103        removed_table_id: &HashSet<TableId>,
104    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
105        let mut changed_table = HashMap::new();
106        let mut has_bumped_committed_epoch = false;
107        fn remove_table_from_compaction_group(
108            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
109            compaction_group_id: CompactionGroupId,
110            table_id: TableId,
111        ) {
112            let member_tables = compaction_group_member_tables
113                .get_mut(&compaction_group_id)
114                .expect("should exist");
115            assert!(member_tables.remove(&table_id));
116            if member_tables.is_empty() {
117                assert!(
118                    compaction_group_member_tables
119                        .remove(&compaction_group_id)
120                        .is_some()
121                );
122            }
123        }
124        for table_id in removed_table_id {
125            if let Some(prev_info) = self.state_table_info.remove(table_id) {
126                remove_table_from_compaction_group(
127                    &mut self.compaction_group_member_tables,
128                    prev_info.compaction_group_id,
129                    *table_id,
130                );
131                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
132            } else {
133                warn!(
134                    table_id = table_id.table_id,
135                    "table to remove does not exist"
136                );
137            }
138        }
139        for (table_id, delta) in delta {
140            if removed_table_id.contains(table_id) {
141                continue;
142            }
143            let new_info = StateTableInfo {
144                committed_epoch: delta.committed_epoch,
145                compaction_group_id: delta.compaction_group_id,
146            };
147            match self.state_table_info.entry(*table_id) {
148                Entry::Occupied(mut entry) => {
149                    let prev_info = entry.get_mut();
150                    assert!(
151                        new_info.committed_epoch >= prev_info.committed_epoch,
152                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
153                        table_id.table_id,
154                        prev_info,
155                        new_info
156                    );
157                    if new_info.committed_epoch > prev_info.committed_epoch {
158                        has_bumped_committed_epoch = true;
159                    }
160                    if prev_info.compaction_group_id != new_info.compaction_group_id {
161                        // table moved to another compaction group
162                        remove_table_from_compaction_group(
163                            &mut self.compaction_group_member_tables,
164                            prev_info.compaction_group_id,
165                            *table_id,
166                        );
167                        assert!(
168                            self.compaction_group_member_tables
169                                .entry(new_info.compaction_group_id)
170                                .or_default()
171                                .insert(*table_id)
172                        );
173                    }
174                    let prev_info = replace(prev_info, new_info);
175                    changed_table.insert(*table_id, Some(prev_info));
176                }
177                Entry::Vacant(entry) => {
178                    assert!(
179                        self.compaction_group_member_tables
180                            .entry(new_info.compaction_group_id)
181                            .or_default()
182                            .insert(*table_id)
183                    );
184                    has_bumped_committed_epoch = true;
185                    entry.insert(new_info);
186                    changed_table.insert(*table_id, None);
187                }
188            }
189        }
190        debug_assert_eq!(
191            self.compaction_group_member_tables,
192            Self::build_compaction_group_member_tables(&self.state_table_info)
193        );
194        (changed_table, has_bumped_committed_epoch)
195    }
196
197    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
198        &self.state_table_info
199    }
200
201    pub fn compaction_group_member_table_ids(
202        &self,
203        compaction_group_id: CompactionGroupId,
204    ) -> &BTreeSet<TableId> {
205        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
206        self.compaction_group_member_tables
207            .get(&compaction_group_id)
208            .unwrap_or_else(|| EMPTY_SET.deref())
209    }
210
211    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
212        &self.compaction_group_member_tables
213    }
214
215    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
216        self.state_table_info
217            .values()
218            .map(|info| info.committed_epoch)
219            .max()
220    }
221}
222
223#[derive(Debug, Clone, PartialEq)]
224pub struct HummockVersionCommon<T, L = T> {
225    pub id: HummockVersionId,
226    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
227    #[deprecated]
228    pub(crate) max_committed_epoch: u64,
229    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
230    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
231    pub state_table_info: HummockVersionStateTableInfo,
232    pub vector_indexes: HashMap<TableId, VectorIndex>,
233}
234
235pub type HummockVersion = HummockVersionCommon<SstableInfo>;
236
237pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
238
239impl Default for HummockVersion {
240    fn default() -> Self {
241        HummockVersion::from(&PbHummockVersion::default())
242    }
243}
244
245impl<T> HummockVersionCommon<T>
246where
247    T: for<'a> From<&'a PbSstableInfo>,
248    PbSstableInfo: for<'a> From<&'a T>,
249{
250    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
251    /// maintain backward compatibility.
252    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
253        pb_version.into()
254    }
255
256    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
257    /// We should maintain backward compatibility.
258    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
259        pb_version.into()
260    }
261
262    pub fn to_protobuf(&self) -> PbHummockVersion {
263        self.into()
264    }
265}
266
267impl HummockVersion {
268    pub fn estimated_encode_len(&self) -> usize {
269        self.levels.len() * size_of::<CompactionGroupId>()
270            + self
271                .levels
272                .values()
273                .map(|level| level.estimated_encode_len())
274                .sum::<usize>()
275            + self.table_watermarks.len() * size_of::<u32>()
276            + self
277                .table_watermarks
278                .values()
279                .map(|table_watermark| table_watermark.estimated_encode_len())
280                .sum::<usize>()
281            + self
282                .table_change_log
283                .values()
284                .map(|c| {
285                    c.iter()
286                        .map(|l| {
287                            l.old_value
288                                .iter()
289                                .chain(l.new_value.iter())
290                                .map(|s| s.estimated_encode_len())
291                                .sum::<usize>()
292                        })
293                        .sum::<usize>()
294                })
295                .sum::<usize>()
296    }
297}
298
299impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
300where
301    T: for<'a> From<&'a PbSstableInfo>,
302{
303    fn from(pb_version: &PbHummockVersion) -> Self {
304        #[expect(deprecated)]
305        Self {
306            id: HummockVersionId(pb_version.id),
307            levels: pb_version
308                .levels
309                .iter()
310                .map(|(group_id, levels)| {
311                    (*group_id as CompactionGroupId, LevelsCommon::from(levels))
312                })
313                .collect(),
314            max_committed_epoch: pb_version.max_committed_epoch,
315            table_watermarks: pb_version
316                .table_watermarks
317                .iter()
318                .map(|(table_id, table_watermark)| {
319                    (
320                        TableId::new(*table_id),
321                        Arc::new(TableWatermarks::from(table_watermark)),
322                    )
323                })
324                .collect(),
325            table_change_log: pb_version
326                .table_change_logs
327                .iter()
328                .map(|(table_id, change_log)| {
329                    (
330                        TableId::new(*table_id),
331                        TableChangeLogCommon::from_protobuf(change_log),
332                    )
333                })
334                .collect(),
335            state_table_info: HummockVersionStateTableInfo::from_protobuf(
336                &pb_version.state_table_info,
337            ),
338            vector_indexes: pb_version
339                .vector_indexes
340                .iter()
341                .map(|(table_id, index)| (table_id.into(), index.clone().into()))
342                .collect(),
343        }
344    }
345}
346
347impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
348where
349    PbSstableInfo: for<'a> From<&'a T>,
350{
351    fn from(version: &HummockVersionCommon<T>) -> Self {
352        #[expect(deprecated)]
353        Self {
354            id: version.id.0,
355            levels: version
356                .levels
357                .iter()
358                .map(|(group_id, levels)| (*group_id as _, levels.into()))
359                .collect(),
360            max_committed_epoch: version.max_committed_epoch,
361            table_watermarks: version
362                .table_watermarks
363                .iter()
364                .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
365                .collect(),
366            table_change_logs: version
367                .table_change_log
368                .iter()
369                .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
370                .collect(),
371            state_table_info: version.state_table_info.to_protobuf(),
372            vector_indexes: version
373                .vector_indexes
374                .iter()
375                .map(|(table_id, index)| (table_id.table_id, index.clone().into()))
376                .collect(),
377        }
378    }
379}
380
381impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
382where
383    PbSstableInfo: From<T>,
384    PbSstableInfo: for<'a> From<&'a T>,
385{
386    fn from(version: HummockVersionCommon<T>) -> Self {
387        #[expect(deprecated)]
388        Self {
389            id: version.id.0,
390            levels: version
391                .levels
392                .into_iter()
393                .map(|(group_id, levels)| (group_id as _, levels.into()))
394                .collect(),
395            max_committed_epoch: version.max_committed_epoch,
396            table_watermarks: version
397                .table_watermarks
398                .into_iter()
399                .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
400                .collect(),
401            table_change_logs: version
402                .table_change_log
403                .into_iter()
404                .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
405                .collect(),
406            state_table_info: version.state_table_info.to_protobuf(),
407            vector_indexes: version
408                .vector_indexes
409                .into_iter()
410                .map(|(table_id, index)| (table_id.table_id, index.into()))
411                .collect(),
412        }
413    }
414}
415
416impl HummockVersion {
417    pub fn next_version_id(&self) -> HummockVersionId {
418        self.id.next()
419    }
420
421    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
422        // for backward-compatibility of previous hummock version delta
423        self.state_table_info.state_table_info.is_empty()
424            && self.levels.values().any(|group| {
425                // state_table_info is not previously filled, but there previously exists some tables
426                #[expect(deprecated)]
427                !group.member_table_ids.is_empty()
428            })
429    }
430
431    pub fn may_fill_backward_compatible_state_table_info_delta(
432        &self,
433        delta: &mut HummockVersionDelta,
434    ) {
435        #[expect(deprecated)]
436        // for backward-compatibility of previous hummock version delta
437        for (cg_id, group) in &self.levels {
438            for table_id in &group.member_table_ids {
439                assert!(
440                    delta
441                        .state_table_info_delta
442                        .insert(
443                            TableId::new(*table_id),
444                            StateTableInfoDelta {
445                                committed_epoch: self.max_committed_epoch,
446                                compaction_group_id: *cg_id,
447                            }
448                        )
449                        .is_none(),
450                    "duplicate table id {} in cg {}",
451                    table_id,
452                    cg_id
453                );
454            }
455        }
456    }
457
458    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
459        #[expect(deprecated)]
460        let mut init_version = HummockVersion {
461            id: FIRST_VERSION_ID,
462            levels: Default::default(),
463            max_committed_epoch: INVALID_EPOCH,
464            table_watermarks: HashMap::new(),
465            table_change_log: HashMap::new(),
466            state_table_info: HummockVersionStateTableInfo::empty(),
467            vector_indexes: Default::default(),
468        };
469        for group_id in [
470            StaticCompactionGroupId::StateDefault as CompactionGroupId,
471            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
472        ] {
473            init_version.levels.insert(
474                group_id,
475                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
476            );
477        }
478        init_version
479    }
480
481    pub fn version_delta_after(&self) -> HummockVersionDelta {
482        #[expect(deprecated)]
483        HummockVersionDelta {
484            id: self.next_version_id(),
485            prev_id: self.id,
486            trivial_move: false,
487            max_committed_epoch: self.max_committed_epoch,
488            group_deltas: Default::default(),
489            new_table_watermarks: HashMap::new(),
490            removed_table_ids: HashSet::new(),
491            change_log_delta: HashMap::new(),
492            state_table_info_delta: Default::default(),
493            vector_index_delta: Default::default(),
494        }
495    }
496
497    pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
498        let table_change_log = {
499            let mut table_change_log = HashMap::new();
500            for (table_id, log) in &mut self.table_change_log {
501                let change_log_iter =
502                    log.change_log_iter_mut()
503                        .map(|item| EpochNewChangeLogCommon {
504                            new_value: std::mem::take(&mut item.new_value),
505                            old_value: std::mem::take(&mut item.old_value),
506                            non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
507                            checkpoint_epoch: item.checkpoint_epoch,
508                        });
509                table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
510            }
511
512            table_change_log
513        };
514
515        let local_version = LocalHummockVersion::from(self);
516
517        (local_version, table_change_log)
518    }
519}
520
521impl<T, L> HummockVersionCommon<T, L> {
522    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
523        self.state_table_info
524            .info()
525            .get(&table_id)
526            .map(|info| info.committed_epoch)
527    }
528}
529
530#[derive(Debug, PartialEq, Clone)]
531pub struct HummockVersionDeltaCommon<T, L = T> {
532    pub id: HummockVersionId,
533    pub prev_id: HummockVersionId,
534    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
535    #[deprecated]
536    pub(crate) max_committed_epoch: u64,
537    pub trivial_move: bool,
538    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
539    pub removed_table_ids: HashSet<TableId>,
540    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
541    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
542    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
543}
544
545pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
546
547pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
548
549impl Default for HummockVersionDelta {
550    fn default() -> Self {
551        HummockVersionDelta::from(&PbHummockVersionDelta::default())
552    }
553}
554
555impl<T> HummockVersionDeltaCommon<T>
556where
557    T: for<'a> From<&'a PbSstableInfo>,
558    PbSstableInfo: for<'a> From<&'a T>,
559{
560    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
561    /// We should maintain backward compatibility.
562    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
563        delta.into()
564    }
565
566    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
567    /// maintain backward compatibility.
568    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
569        delta.into()
570    }
571
572    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
573        self.into()
574    }
575}
576
577pub trait SstableIdReader {
578    fn sst_id(&self) -> HummockSstableId;
579}
580
581pub trait ObjectIdReader {
582    fn object_id(&self) -> HummockSstableObjectId;
583}
584
585impl<T> HummockVersionDeltaCommon<T>
586where
587    T: SstableIdReader + ObjectIdReader,
588{
589    /// Get the newly added object ids from the version delta.
590    ///
591    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
592    /// but it is possible that the object is moved or split from other compaction groups or levels.
593    pub fn newly_added_object_ids(
594        &self,
595        exclude_table_change_log: bool,
596    ) -> HashSet<HummockObjectId> {
597        // DO NOT REMOVE THIS LINE
598        // This is to ensure that when adding new variant to `HummockObjectId`,
599        // the compiler will warn us if we forget to handle it here.
600        match HummockObjectId::Sstable(0.into()) {
601            HummockObjectId::Sstable(_) => {}
602            HummockObjectId::VectorFile(_) => {}
603            HummockObjectId::HnswGraphFile(_) => {}
604        };
605        self.newly_added_sst_infos(exclude_table_change_log)
606            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
607            .chain(
608                self.vector_index_delta
609                    .values()
610                    .flat_map(|vector_index_delta| {
611                        vector_index_delta
612                            .newly_added_objects()
613                            .map(|(object_id, _)| object_id)
614                    }),
615            )
616            .collect()
617    }
618
619    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
620        self.newly_added_sst_infos(exclude_table_change_log)
621            .map(|sst| sst.sst_id())
622            .collect()
623    }
624}
625
626impl<T> HummockVersionDeltaCommon<T> {
627    pub fn newly_added_sst_infos(
628        &self,
629        exclude_table_change_log: bool,
630    ) -> impl Iterator<Item = &'_ T> {
631        let may_table_change_delta = if exclude_table_change_log {
632            None
633        } else {
634            Some(self.change_log_delta.values())
635        };
636        self.group_deltas
637            .values()
638            .flat_map(|group_deltas| {
639                group_deltas.group_deltas.iter().flat_map(|group_delta| {
640                    let sst_slice = match &group_delta {
641                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
642                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
643                            inserted_table_infos,
644                            ..
645                        }) => Some(inserted_table_infos.iter()),
646                        GroupDeltaCommon::GroupConstruct(_)
647                        | GroupDeltaCommon::GroupDestroy(_)
648                        | GroupDeltaCommon::GroupMerge(_)
649                        | GroupDeltaCommon::TruncateTables(_) => None,
650                    };
651                    sst_slice.into_iter().flatten()
652                })
653            })
654            .chain(
655                may_table_change_delta
656                    .map(|v| {
657                        v.flat_map(|delta| {
658                            let new_log = &delta.new_log;
659                            new_log.new_value.iter().chain(new_log.old_value.iter())
660                        })
661                    })
662                    .into_iter()
663                    .flatten(),
664            )
665    }
666}
667
668impl HummockVersionDelta {
669    #[expect(deprecated)]
670    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
671        self.max_committed_epoch
672    }
673}
674
675impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
676where
677    T: for<'a> From<&'a PbSstableInfo>,
678{
679    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
680        #[expect(deprecated)]
681        Self {
682            id: HummockVersionId(pb_version_delta.id),
683            prev_id: HummockVersionId(pb_version_delta.prev_id),
684            group_deltas: pb_version_delta
685                .group_deltas
686                .iter()
687                .map(|(group_id, deltas)| {
688                    (
689                        *group_id as CompactionGroupId,
690                        GroupDeltasCommon::from(deltas),
691                    )
692                })
693                .collect(),
694            max_committed_epoch: pb_version_delta.max_committed_epoch,
695            trivial_move: pb_version_delta.trivial_move,
696            new_table_watermarks: pb_version_delta
697                .new_table_watermarks
698                .iter()
699                .map(|(table_id, watermarks)| {
700                    (TableId::new(*table_id), TableWatermarks::from(watermarks))
701                })
702                .collect(),
703            removed_table_ids: pb_version_delta
704                .removed_table_ids
705                .iter()
706                .map(|table_id| TableId::new(*table_id))
707                .collect(),
708            change_log_delta: pb_version_delta
709                .change_log_delta
710                .iter()
711                .map(|(table_id, log_delta)| {
712                    (
713                        TableId::new(*table_id),
714                        ChangeLogDeltaCommon {
715                            truncate_epoch: log_delta.truncate_epoch,
716                            new_log: log_delta.new_log.as_ref().unwrap().into(),
717                        },
718                    )
719                })
720                .collect(),
721
722            state_table_info_delta: pb_version_delta
723                .state_table_info_delta
724                .iter()
725                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
726                .collect(),
727            vector_index_delta: pb_version_delta
728                .vector_index_delta
729                .iter()
730                .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone().into()))
731                .collect(),
732        }
733    }
734}
735
736impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
737where
738    PbSstableInfo: for<'a> From<&'a T>,
739{
740    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
741        #[expect(deprecated)]
742        Self {
743            id: version_delta.id.0,
744            prev_id: version_delta.prev_id.0,
745            group_deltas: version_delta
746                .group_deltas
747                .iter()
748                .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
749                .collect(),
750            max_committed_epoch: version_delta.max_committed_epoch,
751            trivial_move: version_delta.trivial_move,
752            new_table_watermarks: version_delta
753                .new_table_watermarks
754                .iter()
755                .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
756                .collect(),
757            removed_table_ids: version_delta
758                .removed_table_ids
759                .iter()
760                .map(|table_id| table_id.table_id)
761                .collect(),
762            change_log_delta: version_delta
763                .change_log_delta
764                .iter()
765                .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
766                .collect(),
767            state_table_info_delta: version_delta
768                .state_table_info_delta
769                .iter()
770                .map(|(table_id, delta)| (table_id.table_id, *delta))
771                .collect(),
772            vector_index_delta: version_delta
773                .vector_index_delta
774                .iter()
775                .map(|(table_id, delta)| (table_id.table_id, delta.clone().into()))
776                .collect(),
777        }
778    }
779}
780
781impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
782where
783    PbSstableInfo: From<T>,
784{
785    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
786        #[expect(deprecated)]
787        Self {
788            id: version_delta.id.0,
789            prev_id: version_delta.prev_id.0,
790            group_deltas: version_delta
791                .group_deltas
792                .into_iter()
793                .map(|(group_id, deltas)| (group_id as _, deltas.into()))
794                .collect(),
795            max_committed_epoch: version_delta.max_committed_epoch,
796            trivial_move: version_delta.trivial_move,
797            new_table_watermarks: version_delta
798                .new_table_watermarks
799                .into_iter()
800                .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
801                .collect(),
802            removed_table_ids: version_delta
803                .removed_table_ids
804                .into_iter()
805                .map(|table_id| table_id.table_id)
806                .collect(),
807            change_log_delta: version_delta
808                .change_log_delta
809                .into_iter()
810                .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
811                .collect(),
812            state_table_info_delta: version_delta
813                .state_table_info_delta
814                .into_iter()
815                .map(|(table_id, delta)| (table_id.table_id, delta))
816                .collect(),
817            vector_index_delta: version_delta
818                .vector_index_delta
819                .into_iter()
820                .map(|(table_id, delta)| (table_id.table_id, delta.into()))
821                .collect(),
822        }
823    }
824}
825
826impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
827where
828    T: From<PbSstableInfo>,
829{
830    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
831        #[expect(deprecated)]
832        Self {
833            id: HummockVersionId(pb_version_delta.id),
834            prev_id: HummockVersionId(pb_version_delta.prev_id),
835            group_deltas: pb_version_delta
836                .group_deltas
837                .into_iter()
838                .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
839                .collect(),
840            max_committed_epoch: pb_version_delta.max_committed_epoch,
841            trivial_move: pb_version_delta.trivial_move,
842            new_table_watermarks: pb_version_delta
843                .new_table_watermarks
844                .into_iter()
845                .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
846                .collect(),
847            removed_table_ids: pb_version_delta
848                .removed_table_ids
849                .into_iter()
850                .map(TableId::new)
851                .collect(),
852            change_log_delta: pb_version_delta
853                .change_log_delta
854                .iter()
855                .map(|(table_id, log_delta)| {
856                    (
857                        TableId::new(*table_id),
858                        ChangeLogDeltaCommon {
859                            new_log: log_delta.new_log.clone().unwrap().into(),
860                            truncate_epoch: log_delta.truncate_epoch,
861                        },
862                    )
863                })
864                .collect(),
865            state_table_info_delta: pb_version_delta
866                .state_table_info_delta
867                .iter()
868                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
869                .collect(),
870            vector_index_delta: pb_version_delta
871                .vector_index_delta
872                .into_iter()
873                .map(|(table_id, delta)| (TableId::new(table_id), delta.into()))
874                .collect(),
875        }
876    }
877}
878
879#[derive(Debug, PartialEq, Clone)]
880pub struct IntraLevelDeltaCommon<T> {
881    pub level_idx: u32,
882    pub l0_sub_level_id: u64,
883    pub removed_table_ids: HashSet<HummockSstableId>,
884    pub inserted_table_infos: Vec<T>,
885    pub vnode_partition_count: u32,
886    pub compaction_group_version_id: u64,
887}
888
889pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
890
891impl IntraLevelDelta {
892    pub fn estimated_encode_len(&self) -> usize {
893        size_of::<u32>()
894            + size_of::<u64>()
895            + self.removed_table_ids.len() * size_of::<u32>()
896            + self
897                .inserted_table_infos
898                .iter()
899                .map(|sst| sst.estimated_encode_len())
900                .sum::<usize>()
901            + size_of::<u32>()
902    }
903}
904
905impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
906where
907    T: From<PbSstableInfo>,
908{
909    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
910        Self {
911            level_idx: pb_intra_level_delta.level_idx,
912            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
913            removed_table_ids: HashSet::from_iter(
914                pb_intra_level_delta
915                    .removed_table_ids
916                    .iter()
917                    .map(|sst_id| (*sst_id).into()),
918            ),
919            inserted_table_infos: pb_intra_level_delta
920                .inserted_table_infos
921                .into_iter()
922                .map(Into::into)
923                .collect_vec(),
924            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
925            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
926        }
927    }
928}
929
930impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
931where
932    PbSstableInfo: From<T>,
933{
934    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
935        Self {
936            level_idx: intra_level_delta.level_idx,
937            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
938            removed_table_ids: intra_level_delta
939                .removed_table_ids
940                .into_iter()
941                .map(|sst_id| sst_id.inner())
942                .collect(),
943            inserted_table_infos: intra_level_delta
944                .inserted_table_infos
945                .into_iter()
946                .map(Into::into)
947                .collect_vec(),
948            vnode_partition_count: intra_level_delta.vnode_partition_count,
949            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
950        }
951    }
952}
953
954impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
955where
956    PbSstableInfo: for<'a> From<&'a T>,
957{
958    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
959        Self {
960            level_idx: intra_level_delta.level_idx,
961            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
962            removed_table_ids: intra_level_delta
963                .removed_table_ids
964                .iter()
965                .map(|sst_id| sst_id.inner())
966                .collect(),
967            inserted_table_infos: intra_level_delta
968                .inserted_table_infos
969                .iter()
970                .map(Into::into)
971                .collect_vec(),
972            vnode_partition_count: intra_level_delta.vnode_partition_count,
973            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
974        }
975    }
976}
977
978impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
979where
980    T: for<'a> From<&'a PbSstableInfo>,
981{
982    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
983        Self {
984            level_idx: pb_intra_level_delta.level_idx,
985            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
986            removed_table_ids: HashSet::from_iter(
987                pb_intra_level_delta
988                    .removed_table_ids
989                    .iter()
990                    .map(|sst_id| (*sst_id).into()),
991            ),
992            inserted_table_infos: pb_intra_level_delta
993                .inserted_table_infos
994                .iter()
995                .map(Into::into)
996                .collect_vec(),
997            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
998            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
999        }
1000    }
1001}
1002
1003impl IntraLevelDelta {
1004    pub fn new(
1005        level_idx: u32,
1006        l0_sub_level_id: u64,
1007        removed_table_ids: HashSet<HummockSstableId>,
1008        inserted_table_infos: Vec<SstableInfo>,
1009        vnode_partition_count: u32,
1010        compaction_group_version_id: u64,
1011    ) -> Self {
1012        Self {
1013            level_idx,
1014            l0_sub_level_id,
1015            removed_table_ids,
1016            inserted_table_infos,
1017            vnode_partition_count,
1018            compaction_group_version_id,
1019        }
1020    }
1021}
1022
1023#[derive(Debug, PartialEq, Clone)]
1024pub enum GroupDeltaCommon<T> {
1025    NewL0SubLevel(Vec<T>),
1026    IntraLevel(IntraLevelDeltaCommon<T>),
1027    GroupConstruct(Box<PbGroupConstruct>),
1028    GroupDestroy(PbGroupDestroy),
1029    GroupMerge(PbGroupMerge),
1030    TruncateTables(Vec<u32>),
1031}
1032
1033pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1034
1035impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1036where
1037    T: From<PbSstableInfo>,
1038{
1039    fn from(pb_group_delta: PbGroupDelta) -> Self {
1040        match pb_group_delta.delta_type {
1041            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1042                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1043            }
1044            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1045                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1046            }
1047            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1048                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1049            }
1050            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1051                GroupDeltaCommon::GroupMerge(pb_group_merge)
1052            }
1053            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1054                pb_new_sub_level
1055                    .inserted_table_infos
1056                    .into_iter()
1057                    .map(T::from)
1058                    .collect(),
1059            ),
1060            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1061                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids)
1062            }
1063
1064            None => panic!("delta_type is not set"),
1065        }
1066    }
1067}
1068
1069impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1070where
1071    PbSstableInfo: From<T>,
1072{
1073    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1074        match group_delta {
1075            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1076                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1077            },
1078            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1079                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1080            },
1081            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1082                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1083            },
1084            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1085                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1086            },
1087            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1088                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1089                    inserted_table_infos: new_sub_level
1090                        .into_iter()
1091                        .map(PbSstableInfo::from)
1092                        .collect(),
1093                })),
1094            },
1095            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1096                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables { table_ids })),
1097            },
1098        }
1099    }
1100}
1101
1102impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1103where
1104    PbSstableInfo: for<'a> From<&'a T>,
1105{
1106    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1107        match group_delta {
1108            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1109                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1110            },
1111            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1112                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1113            },
1114            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1115                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1116            },
1117            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1118                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1119            },
1120            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1121                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1122                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1123                })),
1124            },
1125            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1126                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1127                    table_ids: table_ids.clone(),
1128                })),
1129            },
1130        }
1131    }
1132}
1133
1134impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1135where
1136    T: for<'a> From<&'a PbSstableInfo>,
1137{
1138    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1139        match &pb_group_delta.delta_type {
1140            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1141                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1142            }
1143            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1144                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1145            }
1146            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1147                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1148            }
1149            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1150                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1151            }
1152            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1153                pb_new_sub_level
1154                    .inserted_table_infos
1155                    .iter()
1156                    .map(T::from)
1157                    .collect(),
1158            ),
1159            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1160                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.clone())
1161            }
1162            None => panic!("delta_type is not set"),
1163        }
1164    }
1165}
1166
1167#[derive(Debug, PartialEq, Clone)]
1168pub struct GroupDeltasCommon<T> {
1169    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1170}
1171
1172impl<T> Default for GroupDeltasCommon<T> {
1173    fn default() -> Self {
1174        Self {
1175            group_deltas: vec![],
1176        }
1177    }
1178}
1179
1180pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1181
1182impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1183where
1184    T: From<PbSstableInfo>,
1185{
1186    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1187        Self {
1188            group_deltas: pb_group_deltas
1189                .group_deltas
1190                .into_iter()
1191                .map(GroupDeltaCommon::from)
1192                .collect_vec(),
1193        }
1194    }
1195}
1196
1197impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1198where
1199    PbSstableInfo: From<T>,
1200{
1201    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1202        Self {
1203            group_deltas: group_deltas
1204                .group_deltas
1205                .into_iter()
1206                .map(|group_delta| group_delta.into())
1207                .collect_vec(),
1208        }
1209    }
1210}
1211
1212impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1213where
1214    PbSstableInfo: for<'a> From<&'a T>,
1215{
1216    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1217        Self {
1218            group_deltas: group_deltas
1219                .group_deltas
1220                .iter()
1221                .map(|group_delta| group_delta.into())
1222                .collect_vec(),
1223        }
1224    }
1225}
1226
1227impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1228where
1229    T: for<'a> From<&'a PbSstableInfo>,
1230{
1231    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1232        Self {
1233            group_deltas: pb_group_deltas
1234                .group_deltas
1235                .iter()
1236                .map(GroupDeltaCommon::from)
1237                .collect_vec(),
1238        }
1239    }
1240}
1241
1242impl<T> GroupDeltasCommon<T>
1243where
1244    PbSstableInfo: for<'a> From<&'a T>,
1245{
1246    pub fn to_protobuf(&self) -> PbGroupDeltas {
1247        self.into()
1248    }
1249}
1250
1251impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1252    #[expect(deprecated)]
1253    fn from(delta: HummockVersionDelta) -> Self {
1254        Self {
1255            id: delta.id,
1256            prev_id: delta.prev_id,
1257            group_deltas: delta.group_deltas,
1258            max_committed_epoch: delta.max_committed_epoch,
1259            trivial_move: delta.trivial_move,
1260            new_table_watermarks: delta.new_table_watermarks,
1261            removed_table_ids: delta.removed_table_ids,
1262            change_log_delta: delta
1263                .change_log_delta
1264                .into_iter()
1265                .map(|(k, v)| {
1266                    (
1267                        k,
1268                        ChangeLogDeltaCommon {
1269                            truncate_epoch: v.truncate_epoch,
1270                            new_log: EpochNewChangeLogCommon {
1271                                new_value: Vec::new(),
1272                                old_value: Vec::new(),
1273                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1274                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1275                            },
1276                        },
1277                    )
1278                })
1279                .collect(),
1280            state_table_info_delta: delta.state_table_info_delta,
1281            vector_index_delta: delta.vector_index_delta,
1282        }
1283    }
1284}
1285
1286impl From<HummockVersion> for LocalHummockVersion {
1287    #[expect(deprecated)]
1288    fn from(version: HummockVersion) -> Self {
1289        Self {
1290            id: version.id,
1291            levels: version.levels,
1292            max_committed_epoch: version.max_committed_epoch,
1293            table_watermarks: version.table_watermarks,
1294            table_change_log: version
1295                .table_change_log
1296                .into_iter()
1297                .map(|(k, v)| {
1298                    let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1299                        .change_log_into_iter()
1300                        .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1301                            new_value: Vec::new(),
1302                            old_value: Vec::new(),
1303                            non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1304                            checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1305                        })
1306                        .collect();
1307                    (k, TableChangeLogCommon::new(epoch_new_change_logs))
1308                })
1309                .collect(),
1310            state_table_info: version.state_table_info,
1311            vector_indexes: version.vector_indexes,
1312        }
1313    }
1314}