risingwave_hummock_sdk/
version.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon};
30use crate::compaction_group::StaticCompactionGroupId;
31use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
32use crate::level::LevelsCommon;
33use crate::sstable_info::SstableInfo;
34use crate::table_watermark::TableWatermarks;
35use crate::vector_index::{VectorIndex, VectorIndexDelta};
36use crate::{
37    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
38    HummockSstableObjectId, HummockVersionId,
39};
40
41pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45    state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47    // in memory index
48    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52    pub fn empty() -> Self {
53        Self {
54            state_table_info: HashMap::new(),
55            compaction_group_member_tables: HashMap::new(),
56        }
57    }
58
59    fn build_compaction_group_member_tables(
60        state_table_info: &HashMap<TableId, PbStateTableInfo>,
61    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63        for (table_id, info) in state_table_info {
64            assert!(
65                ret.entry(info.compaction_group_id)
66                    .or_default()
67                    .insert(*table_id)
68            );
69        }
70        ret
71    }
72
73    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74        self.state_table_info
75            .iter()
76            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77            .collect()
78    }
79
80    pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
81        let state_table_info = state_table_info
82            .iter()
83            .map(|(table_id, info)| (*table_id, *info))
84            .collect();
85        let compaction_group_member_tables =
86            Self::build_compaction_group_member_tables(&state_table_info);
87        Self {
88            state_table_info,
89            compaction_group_member_tables,
90        }
91    }
92
93    pub fn from_protobuf_owned(state_table_info: HashMap<TableId, PbStateTableInfo>) -> Self {
94        let compaction_group_member_tables =
95            Self::build_compaction_group_member_tables(&state_table_info);
96        Self {
97            state_table_info,
98            compaction_group_member_tables,
99        }
100    }
101
102    pub fn apply_delta(
103        &mut self,
104        delta: &HashMap<TableId, StateTableInfoDelta>,
105        removed_table_id: &HashSet<TableId>,
106    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
107        let mut changed_table = HashMap::new();
108        let mut has_bumped_committed_epoch = false;
109        fn remove_table_from_compaction_group(
110            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
111            compaction_group_id: CompactionGroupId,
112            table_id: TableId,
113        ) {
114            let member_tables = compaction_group_member_tables
115                .get_mut(&compaction_group_id)
116                .expect("should exist");
117            assert!(member_tables.remove(&table_id));
118            if member_tables.is_empty() {
119                assert!(
120                    compaction_group_member_tables
121                        .remove(&compaction_group_id)
122                        .is_some()
123                );
124            }
125        }
126        for table_id in removed_table_id {
127            if let Some(prev_info) = self.state_table_info.remove(table_id) {
128                remove_table_from_compaction_group(
129                    &mut self.compaction_group_member_tables,
130                    prev_info.compaction_group_id,
131                    *table_id,
132                );
133                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
134            } else {
135                warn!(
136                    %table_id,
137                    "table to remove does not exist"
138                );
139            }
140        }
141        for (table_id, delta) in delta {
142            if removed_table_id.contains(table_id) {
143                continue;
144            }
145            let new_info = StateTableInfo {
146                committed_epoch: delta.committed_epoch,
147                compaction_group_id: delta.compaction_group_id,
148            };
149            match self.state_table_info.entry(*table_id) {
150                Entry::Occupied(mut entry) => {
151                    let prev_info = entry.get_mut();
152                    assert!(
153                        new_info.committed_epoch >= prev_info.committed_epoch,
154                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
155                        table_id,
156                        prev_info,
157                        new_info
158                    );
159                    if new_info.committed_epoch > prev_info.committed_epoch {
160                        has_bumped_committed_epoch = true;
161                    }
162                    if prev_info.compaction_group_id != new_info.compaction_group_id {
163                        // table moved to another compaction group
164                        remove_table_from_compaction_group(
165                            &mut self.compaction_group_member_tables,
166                            prev_info.compaction_group_id,
167                            *table_id,
168                        );
169                        assert!(
170                            self.compaction_group_member_tables
171                                .entry(new_info.compaction_group_id)
172                                .or_default()
173                                .insert(*table_id)
174                        );
175                    }
176                    let prev_info = replace(prev_info, new_info);
177                    changed_table.insert(*table_id, Some(prev_info));
178                }
179                Entry::Vacant(entry) => {
180                    assert!(
181                        self.compaction_group_member_tables
182                            .entry(new_info.compaction_group_id)
183                            .or_default()
184                            .insert(*table_id)
185                    );
186                    has_bumped_committed_epoch = true;
187                    entry.insert(new_info);
188                    changed_table.insert(*table_id, None);
189                }
190            }
191        }
192        debug_assert_eq!(
193            self.compaction_group_member_tables,
194            Self::build_compaction_group_member_tables(&self.state_table_info)
195        );
196        (changed_table, has_bumped_committed_epoch)
197    }
198
199    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
200        &self.state_table_info
201    }
202
203    pub fn compaction_group_member_table_ids(
204        &self,
205        compaction_group_id: CompactionGroupId,
206    ) -> &BTreeSet<TableId> {
207        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
208        self.compaction_group_member_tables
209            .get(&compaction_group_id)
210            .unwrap_or_else(|| EMPTY_SET.deref())
211    }
212
213    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
214        &self.compaction_group_member_tables
215    }
216
217    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
218        self.state_table_info
219            .values()
220            .map(|info| info.committed_epoch)
221            .max()
222    }
223}
224
225#[derive(Debug, Clone, PartialEq)]
226pub struct HummockVersionCommon<T, L = T> {
227    pub id: HummockVersionId,
228    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
229    #[deprecated]
230    pub(crate) max_committed_epoch: u64,
231    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
232    #[deprecated]
233    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
234    pub state_table_info: HummockVersionStateTableInfo,
235    pub vector_indexes: HashMap<TableId, VectorIndex>,
236}
237
238pub type HummockVersion = HummockVersionCommon<SstableInfo>;
239
240pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
241
242impl Default for HummockVersion {
243    fn default() -> Self {
244        HummockVersion::from(&PbHummockVersion::default())
245    }
246}
247
248impl<T> HummockVersionCommon<T>
249where
250    T: for<'a> From<&'a PbSstableInfo>,
251    PbSstableInfo: for<'a> From<&'a T>,
252{
253    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
254    /// maintain backward compatibility.
255    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
256        pb_version.into()
257    }
258
259    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
260    /// We should maintain backward compatibility.
261    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
262        pb_version.into()
263    }
264
265    pub fn to_protobuf(&self) -> PbHummockVersion {
266        self.into()
267    }
268}
269
270impl<T> HummockVersionCommon<T>
271where
272    T: From<PbSstableInfo>,
273    PbSstableInfo: for<'a> From<&'a T>,
274{
275    /// Convert an owned `PbHummockVersion` deserialized from persisted state to `HummockVersion`,
276    /// moving data instead of cloning for better performance on large checkpoints.
277    pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
278        pb_version.into()
279    }
280}
281
282impl HummockVersion {
283    #[expect(deprecated)]
284    pub fn estimated_encode_len(&self) -> usize {
285        self.levels.len() * size_of::<CompactionGroupId>()
286            + self
287                .levels
288                .values()
289                .map(|level| level.estimated_encode_len())
290                .sum::<usize>()
291            + self.table_watermarks.len() * size_of::<u32>()
292            + self
293                .table_watermarks
294                .values()
295                .map(|table_watermark| table_watermark.estimated_encode_len())
296                .sum::<usize>()
297            + self
298                .table_change_log
299                .values()
300                .map(|c| {
301                    c.iter()
302                        .map(|l| {
303                            l.old_value
304                                .iter()
305                                .chain(l.new_value.iter())
306                                .map(|s| s.estimated_encode_len())
307                                .sum::<usize>()
308                        })
309                        .sum::<usize>()
310                })
311                .sum::<usize>()
312    }
313}
314
315impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
316where
317    T: for<'a> From<&'a PbSstableInfo>,
318{
319    fn from(pb_version: &PbHummockVersion) -> Self {
320        #[expect(deprecated)]
321        Self {
322            id: pb_version.id,
323            levels: pb_version
324                .levels
325                .iter()
326                .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
327                .collect(),
328            max_committed_epoch: pb_version.max_committed_epoch,
329            table_watermarks: pb_version
330                .table_watermarks
331                .iter()
332                .map(|(table_id, table_watermark)| {
333                    (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
334                })
335                .collect(),
336            table_change_log: pb_version
337                .table_change_logs
338                .iter()
339                .map(|(table_id, change_log)| {
340                    (*table_id, TableChangeLogCommon::from_protobuf(change_log))
341                })
342                .collect(),
343            state_table_info: HummockVersionStateTableInfo::from_protobuf(
344                &pb_version.state_table_info,
345            ),
346            vector_indexes: pb_version
347                .vector_indexes
348                .iter()
349                .map(|(table_id, index)| (*table_id, index.clone().into()))
350                .collect(),
351        }
352    }
353}
354
355impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
356where
357    T: From<PbSstableInfo>,
358{
359    fn from(pb_version: PbHummockVersion) -> Self {
360        #[expect(deprecated)]
361        Self {
362            id: pb_version.id,
363            levels: pb_version
364                .levels
365                .into_iter()
366                .map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
367                .collect(),
368            max_committed_epoch: pb_version.max_committed_epoch,
369            table_watermarks: pb_version
370                .table_watermarks
371                .into_iter()
372                .map(|(table_id, table_watermark)| {
373                    (table_id, Arc::new(TableWatermarks::from(table_watermark)))
374                })
375                .collect(),
376            table_change_log: pb_version
377                .table_change_logs
378                .into_iter()
379                .map(|(table_id, change_log)| {
380                    (
381                        table_id,
382                        TableChangeLogCommon::from_protobuf_owned(change_log),
383                    )
384                })
385                .collect(),
386            state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
387                pb_version.state_table_info,
388            ),
389            vector_indexes: pb_version
390                .vector_indexes
391                .into_iter()
392                .map(|(table_id, index)| (table_id, index.into()))
393                .collect(),
394        }
395    }
396}
397
398impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
399where
400    PbSstableInfo: for<'a> From<&'a T>,
401{
402    fn from(version: &HummockVersionCommon<T>) -> Self {
403        #[expect(deprecated)]
404        Self {
405            id: version.id,
406            levels: version
407                .levels
408                .iter()
409                .map(|(group_id, levels)| (*group_id, levels.into()))
410                .collect(),
411            max_committed_epoch: version.max_committed_epoch,
412            table_watermarks: version
413                .table_watermarks
414                .iter()
415                .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
416                .collect(),
417            table_change_logs: version
418                .table_change_log
419                .iter()
420                .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
421                .collect(),
422            state_table_info: version.state_table_info.state_table_info.clone(),
423            vector_indexes: version
424                .vector_indexes
425                .iter()
426                .map(|(table_id, index)| (*table_id, index.clone().into()))
427                .collect(),
428        }
429    }
430}
431
432impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
433where
434    PbSstableInfo: From<T>,
435    PbSstableInfo: for<'a> From<&'a T>,
436{
437    fn from(version: HummockVersionCommon<T>) -> Self {
438        #[expect(deprecated)]
439        Self {
440            id: version.id,
441            levels: version
442                .levels
443                .into_iter()
444                .map(|(group_id, levels)| (group_id, levels.into()))
445                .collect(),
446            max_committed_epoch: version.max_committed_epoch,
447            table_watermarks: version
448                .table_watermarks
449                .into_iter()
450                .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
451                .collect(),
452            table_change_logs: version
453                .table_change_log
454                .into_iter()
455                .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
456                .collect(),
457            state_table_info: version.state_table_info.state_table_info.clone(),
458            vector_indexes: version
459                .vector_indexes
460                .into_iter()
461                .map(|(table_id, index)| (table_id, index.into()))
462                .collect(),
463        }
464    }
465}
466
467impl HummockVersion {
468    pub fn next_version_id(&self) -> HummockVersionId {
469        self.id + 1
470    }
471
472    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
473        // for backward-compatibility of previous hummock version delta
474        self.state_table_info.state_table_info.is_empty()
475            && self.levels.values().any(|group| {
476                // state_table_info is not previously filled, but there previously exists some tables
477                #[expect(deprecated)]
478                !group.member_table_ids.is_empty()
479            })
480    }
481
482    pub fn may_fill_backward_compatible_state_table_info_delta(
483        &self,
484        delta: &mut HummockVersionDelta,
485    ) {
486        #[expect(deprecated)]
487        // for backward-compatibility of previous hummock version delta
488        for (cg_id, group) in &self.levels {
489            for table_id in &group.member_table_ids {
490                assert!(
491                    delta
492                        .state_table_info_delta
493                        .insert(
494                            (*table_id).into(),
495                            StateTableInfoDelta {
496                                committed_epoch: self.max_committed_epoch,
497                                compaction_group_id: *cg_id,
498                            }
499                        )
500                        .is_none(),
501                    "duplicate table id {} in cg {}",
502                    table_id,
503                    cg_id
504                );
505            }
506        }
507    }
508
509    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
510        #[expect(deprecated)]
511        let mut init_version = HummockVersion {
512            id: FIRST_VERSION_ID,
513            levels: Default::default(),
514            max_committed_epoch: INVALID_EPOCH,
515            table_watermarks: HashMap::new(),
516            table_change_log: HashMap::new(),
517            state_table_info: HummockVersionStateTableInfo::empty(),
518            vector_indexes: Default::default(),
519        };
520        for group_id in [
521            StaticCompactionGroupId::StateDefault as CompactionGroupId,
522            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
523        ] {
524            init_version.levels.insert(
525                group_id,
526                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
527            );
528        }
529        init_version
530    }
531
532    pub fn version_delta_after(&self) -> HummockVersionDelta {
533        #[expect(deprecated)]
534        HummockVersionDelta {
535            id: self.next_version_id(),
536            prev_id: self.id,
537            trivial_move: false,
538            max_committed_epoch: self.max_committed_epoch,
539            group_deltas: Default::default(),
540            new_table_watermarks: HashMap::new(),
541            removed_table_ids: HashSet::new(),
542            change_log_delta: HashMap::new(),
543            state_table_info_delta: Default::default(),
544            vector_index_delta: Default::default(),
545        }
546    }
547}
548
549impl<T, L> HummockVersionCommon<T, L> {
550    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
551        self.state_table_info
552            .info()
553            .get(&table_id)
554            .map(|info| info.committed_epoch)
555    }
556}
557
558#[derive(Debug, PartialEq, Clone)]
559pub struct HummockVersionDeltaCommon<T, L = T> {
560    pub id: HummockVersionId,
561    pub prev_id: HummockVersionId,
562    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
563    #[deprecated]
564    pub(crate) max_committed_epoch: u64,
565    pub trivial_move: bool,
566    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
567    pub removed_table_ids: HashSet<TableId>,
568    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
569    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
570    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
571}
572
573pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
574
575pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
576
577impl Default for HummockVersionDelta {
578    fn default() -> Self {
579        HummockVersionDelta::from(&PbHummockVersionDelta::default())
580    }
581}
582
583impl<T> HummockVersionDeltaCommon<T>
584where
585    T: for<'a> From<&'a PbSstableInfo>,
586    PbSstableInfo: for<'a> From<&'a T>,
587{
588    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
589    /// We should maintain backward compatibility.
590    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
591        delta.into()
592    }
593
594    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
595    /// maintain backward compatibility.
596    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
597        delta.into()
598    }
599
600    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
601        self.into()
602    }
603}
604
605impl<T> HummockVersionDeltaCommon<T>
606where
607    T: From<PbSstableInfo>,
608    PbSstableInfo: for<'a> From<&'a T>,
609{
610    /// Convert an owned `PbHummockVersionDelta` deserialized from persisted state to
611    /// `HummockVersionDelta`, moving data instead of cloning.
612    pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
613        delta.into()
614    }
615}
616
617pub trait SstableIdReader {
618    fn sst_id(&self) -> HummockSstableId;
619}
620
621pub trait ObjectIdReader {
622    fn object_id(&self) -> HummockSstableObjectId;
623}
624
625impl<T> HummockVersionDeltaCommon<T>
626where
627    T: SstableIdReader + ObjectIdReader,
628{
629    /// Get the newly added object ids from the version delta.
630    ///
631    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
632    /// but it is possible that the object is moved or split from other compaction groups or levels.
633    pub fn newly_added_object_ids(
634        &self,
635        exclude_table_change_log: bool,
636    ) -> HashSet<HummockObjectId> {
637        // DO NOT REMOVE THIS LINE
638        // This is to ensure that when adding new variant to `HummockObjectId`,
639        // the compiler will warn us if we forget to handle it here.
640        match HummockObjectId::Sstable(0.into()) {
641            HummockObjectId::Sstable(_) => {}
642            HummockObjectId::VectorFile(_) => {}
643            HummockObjectId::HnswGraphFile(_) => {}
644        };
645        self.newly_added_sst_infos(exclude_table_change_log)
646            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
647            .chain(
648                self.vector_index_delta
649                    .values()
650                    .flat_map(|vector_index_delta| {
651                        vector_index_delta
652                            .newly_added_objects()
653                            .map(|(object_id, _)| object_id)
654                    }),
655            )
656            .collect()
657    }
658
659    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
660        self.newly_added_sst_infos(exclude_table_change_log)
661            .map(|sst| sst.sst_id())
662            .collect()
663    }
664}
665
666impl<T> HummockVersionDeltaCommon<T> {
667    pub fn newly_added_sst_infos(
668        &self,
669        exclude_table_change_log: bool,
670    ) -> impl Iterator<Item = &'_ T> {
671        let may_table_change_delta = if exclude_table_change_log {
672            None
673        } else {
674            Some(self.change_log_delta.values())
675        };
676        self.group_deltas
677            .values()
678            .flat_map(|group_deltas| {
679                group_deltas.group_deltas.iter().flat_map(|group_delta| {
680                    let sst_slice = match &group_delta {
681                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
682                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
683                            inserted_table_infos,
684                            ..
685                        }) => Some(inserted_table_infos.iter()),
686                        GroupDeltaCommon::GroupConstruct(_)
687                        | GroupDeltaCommon::GroupDestroy(_)
688                        | GroupDeltaCommon::GroupMerge(_)
689                        | GroupDeltaCommon::TruncateTables(_) => None,
690                    };
691                    sst_slice.into_iter().flatten()
692                })
693            })
694            .chain(
695                may_table_change_delta
696                    .map(|v| {
697                        v.flat_map(|delta| {
698                            let new_log = &delta.new_log;
699                            new_log.new_value.iter().chain(new_log.old_value.iter())
700                        })
701                    })
702                    .into_iter()
703                    .flatten(),
704            )
705    }
706}
707
708impl HummockVersionDelta {
709    #[expect(deprecated)]
710    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
711        self.max_committed_epoch
712    }
713}
714
715impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
716where
717    T: for<'a> From<&'a PbSstableInfo>,
718{
719    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
720        #[expect(deprecated)]
721        Self {
722            id: pb_version_delta.id,
723            prev_id: pb_version_delta.prev_id,
724            group_deltas: pb_version_delta
725                .group_deltas
726                .iter()
727                .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
728                .collect(),
729            max_committed_epoch: pb_version_delta.max_committed_epoch,
730            trivial_move: pb_version_delta.trivial_move,
731            new_table_watermarks: pb_version_delta
732                .new_table_watermarks
733                .iter()
734                .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
735                .collect(),
736            removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
737            change_log_delta: pb_version_delta
738                .change_log_delta
739                .iter()
740                .map(|(table_id, log_delta)| {
741                    (
742                        *table_id,
743                        ChangeLogDeltaCommon {
744                            truncate_epoch: log_delta.truncate_epoch,
745                            new_log: log_delta.new_log.as_ref().unwrap().into(),
746                        },
747                    )
748                })
749                .collect(),
750
751            state_table_info_delta: pb_version_delta
752                .state_table_info_delta
753                .iter()
754                .map(|(table_id, delta)| (*table_id, *delta))
755                .collect(),
756            vector_index_delta: pb_version_delta
757                .vector_index_delta
758                .iter()
759                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
760                .collect(),
761        }
762    }
763}
764
765impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
766where
767    PbSstableInfo: for<'a> From<&'a T>,
768{
769    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
770        #[expect(deprecated)]
771        Self {
772            id: version_delta.id,
773            prev_id: version_delta.prev_id,
774            group_deltas: version_delta
775                .group_deltas
776                .iter()
777                .map(|(group_id, deltas)| (*group_id, deltas.into()))
778                .collect(),
779            max_committed_epoch: version_delta.max_committed_epoch,
780            trivial_move: version_delta.trivial_move,
781            new_table_watermarks: version_delta
782                .new_table_watermarks
783                .iter()
784                .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
785                .collect(),
786            removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
787            change_log_delta: version_delta
788                .change_log_delta
789                .iter()
790                .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
791                .collect(),
792            state_table_info_delta: version_delta.state_table_info_delta.clone(),
793            vector_index_delta: version_delta
794                .vector_index_delta
795                .iter()
796                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
797                .collect(),
798        }
799    }
800}
801
802impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
803where
804    PbSstableInfo: From<T>,
805{
806    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
807        #[expect(deprecated)]
808        Self {
809            id: version_delta.id,
810            prev_id: version_delta.prev_id,
811            group_deltas: version_delta
812                .group_deltas
813                .into_iter()
814                .map(|(group_id, deltas)| (group_id, deltas.into()))
815                .collect(),
816            max_committed_epoch: version_delta.max_committed_epoch,
817            trivial_move: version_delta.trivial_move,
818            new_table_watermarks: version_delta
819                .new_table_watermarks
820                .into_iter()
821                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
822                .collect(),
823            removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
824            change_log_delta: version_delta
825                .change_log_delta
826                .into_iter()
827                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
828                .collect(),
829            state_table_info_delta: version_delta.state_table_info_delta,
830            vector_index_delta: version_delta
831                .vector_index_delta
832                .into_iter()
833                .map(|(table_id, delta)| (table_id, delta.into()))
834                .collect(),
835        }
836    }
837}
838
839impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
840where
841    T: From<PbSstableInfo>,
842{
843    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
844        #[expect(deprecated)]
845        Self {
846            id: pb_version_delta.id,
847            prev_id: pb_version_delta.prev_id,
848            group_deltas: pb_version_delta
849                .group_deltas
850                .into_iter()
851                .map(|(group_id, deltas)| (group_id, deltas.into()))
852                .collect(),
853            max_committed_epoch: pb_version_delta.max_committed_epoch,
854            trivial_move: pb_version_delta.trivial_move,
855            new_table_watermarks: pb_version_delta
856                .new_table_watermarks
857                .into_iter()
858                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
859                .collect(),
860            removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
861            change_log_delta: pb_version_delta
862                .change_log_delta
863                .into_iter()
864                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
865                .collect(),
866            state_table_info_delta: pb_version_delta.state_table_info_delta,
867            vector_index_delta: pb_version_delta
868                .vector_index_delta
869                .into_iter()
870                .map(|(table_id, delta)| (table_id, delta.into()))
871                .collect(),
872        }
873    }
874}
875
876#[derive(Debug, PartialEq, Clone)]
877pub struct IntraLevelDeltaCommon<T> {
878    pub level_idx: u32,
879    pub l0_sub_level_id: u64,
880    pub removed_table_ids: HashSet<HummockSstableId>,
881    pub inserted_table_infos: Vec<T>,
882    pub vnode_partition_count: u32,
883    pub compaction_group_version_id: u64,
884}
885
886pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
887
888impl IntraLevelDelta {
889    pub fn estimated_encode_len(&self) -> usize {
890        size_of::<u32>()
891            + size_of::<u64>()
892            + self.removed_table_ids.len() * size_of::<u32>()
893            + self
894                .inserted_table_infos
895                .iter()
896                .map(|sst| sst.estimated_encode_len())
897                .sum::<usize>()
898            + size_of::<u32>()
899    }
900}
901
902impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
903where
904    T: From<PbSstableInfo>,
905{
906    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
907        Self {
908            level_idx: pb_intra_level_delta.level_idx,
909            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
910            removed_table_ids: HashSet::from_iter(
911                pb_intra_level_delta.removed_table_ids.iter().copied(),
912            ),
913            inserted_table_infos: pb_intra_level_delta
914                .inserted_table_infos
915                .into_iter()
916                .map(Into::into)
917                .collect_vec(),
918            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
919            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
920        }
921    }
922}
923
924impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
925where
926    PbSstableInfo: From<T>,
927{
928    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
929        Self {
930            level_idx: intra_level_delta.level_idx,
931            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
932            removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
933            inserted_table_infos: intra_level_delta
934                .inserted_table_infos
935                .into_iter()
936                .map(Into::into)
937                .collect_vec(),
938            vnode_partition_count: intra_level_delta.vnode_partition_count,
939            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
940        }
941    }
942}
943
944impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
945where
946    PbSstableInfo: for<'a> From<&'a T>,
947{
948    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
949        Self {
950            level_idx: intra_level_delta.level_idx,
951            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
952            removed_table_ids: intra_level_delta
953                .removed_table_ids
954                .iter()
955                .copied()
956                .collect(),
957            inserted_table_infos: intra_level_delta
958                .inserted_table_infos
959                .iter()
960                .map(Into::into)
961                .collect_vec(),
962            vnode_partition_count: intra_level_delta.vnode_partition_count,
963            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
964        }
965    }
966}
967
968impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
969where
970    T: for<'a> From<&'a PbSstableInfo>,
971{
972    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
973        Self {
974            level_idx: pb_intra_level_delta.level_idx,
975            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
976            removed_table_ids: HashSet::from_iter(
977                pb_intra_level_delta.removed_table_ids.iter().copied(),
978            ),
979            inserted_table_infos: pb_intra_level_delta
980                .inserted_table_infos
981                .iter()
982                .map(Into::into)
983                .collect_vec(),
984            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
985            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
986        }
987    }
988}
989
990impl IntraLevelDelta {
991    pub fn new(
992        level_idx: u32,
993        l0_sub_level_id: u64,
994        removed_table_ids: HashSet<HummockSstableId>,
995        inserted_table_infos: Vec<SstableInfo>,
996        vnode_partition_count: u32,
997        compaction_group_version_id: u64,
998    ) -> Self {
999        Self {
1000            level_idx,
1001            l0_sub_level_id,
1002            removed_table_ids,
1003            inserted_table_infos,
1004            vnode_partition_count,
1005            compaction_group_version_id,
1006        }
1007    }
1008}
1009
1010#[derive(Debug, PartialEq, Clone)]
1011pub enum GroupDeltaCommon<T> {
1012    NewL0SubLevel(Vec<T>),
1013    IntraLevel(IntraLevelDeltaCommon<T>),
1014    GroupConstruct(Box<PbGroupConstruct>),
1015    GroupDestroy(PbGroupDestroy),
1016    GroupMerge(PbGroupMerge),
1017    TruncateTables(HashSet<TableId>),
1018}
1019
1020pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1021
1022impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1023where
1024    T: From<PbSstableInfo>,
1025{
1026    fn from(pb_group_delta: PbGroupDelta) -> Self {
1027        match pb_group_delta.delta_type {
1028            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1029                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1030            }
1031            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1032                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1033            }
1034            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1035                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1036            }
1037            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1038                GroupDeltaCommon::GroupMerge(pb_group_merge)
1039            }
1040            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1041                pb_new_sub_level
1042                    .inserted_table_infos
1043                    .into_iter()
1044                    .map(T::from)
1045                    .collect(),
1046            ),
1047            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1048                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1049            }
1050
1051            None => panic!("delta_type is not set"),
1052        }
1053    }
1054}
1055
1056impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1057where
1058    PbSstableInfo: From<T>,
1059{
1060    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1061        match group_delta {
1062            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1063                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1064            },
1065            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1066                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1067            },
1068            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1069                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1070            },
1071            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1072                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1073            },
1074            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1075                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1076                    inserted_table_infos: new_sub_level
1077                        .into_iter()
1078                        .map(PbSstableInfo::from)
1079                        .collect(),
1080                })),
1081            },
1082            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1083                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1084                    table_ids: table_ids.iter().copied().collect(),
1085                })),
1086            },
1087        }
1088    }
1089}
1090
1091impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1092where
1093    PbSstableInfo: for<'a> From<&'a T>,
1094{
1095    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1096        match group_delta {
1097            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1098                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1099            },
1100            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1101                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1102            },
1103            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1104                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1105            },
1106            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1107                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1108            },
1109            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1110                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1111                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1112                })),
1113            },
1114            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1115                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1116                    table_ids: table_ids.iter().copied().collect(),
1117                })),
1118            },
1119        }
1120    }
1121}
1122
1123impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1124where
1125    T: for<'a> From<&'a PbSstableInfo>,
1126{
1127    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1128        match &pb_group_delta.delta_type {
1129            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1130                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1131            }
1132            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1133                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1134            }
1135            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1136                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1137            }
1138            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1139                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1140            }
1141            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1142                pb_new_sub_level
1143                    .inserted_table_infos
1144                    .iter()
1145                    .map(T::from)
1146                    .collect(),
1147            ),
1148            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1149                GroupDeltaCommon::TruncateTables(
1150                    pb_truncate_tables.table_ids.iter().copied().collect(),
1151                )
1152            }
1153            None => panic!("delta_type is not set"),
1154        }
1155    }
1156}
1157
1158#[derive(Debug, PartialEq, Clone)]
1159pub struct GroupDeltasCommon<T> {
1160    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1161}
1162
1163impl<T> Default for GroupDeltasCommon<T> {
1164    fn default() -> Self {
1165        Self {
1166            group_deltas: vec![],
1167        }
1168    }
1169}
1170
1171pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1172
1173impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1174where
1175    T: From<PbSstableInfo>,
1176{
1177    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1178        Self {
1179            group_deltas: pb_group_deltas
1180                .group_deltas
1181                .into_iter()
1182                .map(GroupDeltaCommon::from)
1183                .collect_vec(),
1184        }
1185    }
1186}
1187
1188impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1189where
1190    PbSstableInfo: From<T>,
1191{
1192    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1193        Self {
1194            group_deltas: group_deltas
1195                .group_deltas
1196                .into_iter()
1197                .map(|group_delta| group_delta.into())
1198                .collect_vec(),
1199        }
1200    }
1201}
1202
1203impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1204where
1205    PbSstableInfo: for<'a> From<&'a T>,
1206{
1207    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1208        Self {
1209            group_deltas: group_deltas
1210                .group_deltas
1211                .iter()
1212                .map(|group_delta| group_delta.into())
1213                .collect_vec(),
1214        }
1215    }
1216}
1217
1218impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1219where
1220    T: for<'a> From<&'a PbSstableInfo>,
1221{
1222    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1223        Self {
1224            group_deltas: pb_group_deltas
1225                .group_deltas
1226                .iter()
1227                .map(GroupDeltaCommon::from)
1228                .collect_vec(),
1229        }
1230    }
1231}
1232
1233impl<T> GroupDeltasCommon<T>
1234where
1235    PbSstableInfo: for<'a> From<&'a T>,
1236{
1237    pub fn to_protobuf(&self) -> PbGroupDeltas {
1238        self.into()
1239    }
1240}
1241
1242impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1243    #[expect(deprecated)]
1244    fn from(delta: HummockVersionDelta) -> Self {
1245        Self {
1246            id: delta.id,
1247            prev_id: delta.prev_id,
1248            group_deltas: delta.group_deltas,
1249            max_committed_epoch: delta.max_committed_epoch,
1250            trivial_move: delta.trivial_move,
1251            new_table_watermarks: delta.new_table_watermarks,
1252            removed_table_ids: delta.removed_table_ids,
1253            change_log_delta: delta
1254                .change_log_delta
1255                .into_iter()
1256                .map(|(k, v)| {
1257                    (
1258                        k,
1259                        ChangeLogDeltaCommon {
1260                            truncate_epoch: v.truncate_epoch,
1261                            new_log: EpochNewChangeLogCommon {
1262                                new_value: Vec::new(),
1263                                old_value: Vec::new(),
1264                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1265                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1266                            },
1267                        },
1268                    )
1269                })
1270                .collect(),
1271            state_table_info_delta: delta.state_table_info_delta,
1272            vector_index_delta: delta.vector_index_delta,
1273        }
1274    }
1275}
1276
1277impl From<HummockVersion> for LocalHummockVersion {
1278    #[expect(deprecated)]
1279    fn from(version: HummockVersion) -> Self {
1280        Self {
1281            id: version.id,
1282            levels: version.levels,
1283            max_committed_epoch: version.max_committed_epoch,
1284            table_watermarks: version.table_watermarks,
1285            table_change_log: HashMap::default(),
1286            state_table_info: version.state_table_info,
1287            vector_indexes: version.vector_indexes,
1288        }
1289    }
1290}