risingwave_hummock_sdk/
version.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40    HummockSstableObjectId, HummockVersionId,
41};
42
43pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct HummockVersionStateTableInfo {
47    state_table_info: HashMap<TableId, PbStateTableInfo>,
48
49    // in memory index
50    compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
51}
52
53impl HummockVersionStateTableInfo {
54    pub fn empty() -> Self {
55        Self {
56            state_table_info: HashMap::new(),
57            compaction_group_member_tables: HashMap::new(),
58        }
59    }
60
61    fn build_compaction_group_member_tables(
62        state_table_info: &HashMap<TableId, PbStateTableInfo>,
63    ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
64        let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
65        for (table_id, info) in state_table_info {
66            assert!(
67                ret.entry(info.compaction_group_id)
68                    .or_default()
69                    .insert(*table_id)
70            );
71        }
72        ret
73    }
74
75    pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
76        self.state_table_info
77            .iter()
78            .map(|(table_id, info)| (*table_id, info.compaction_group_id))
79            .collect()
80    }
81
82    pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
83        let state_table_info = state_table_info
84            .iter()
85            .map(|(table_id, info)| (*table_id, *info))
86            .collect();
87        let compaction_group_member_tables =
88            Self::build_compaction_group_member_tables(&state_table_info);
89        Self {
90            state_table_info,
91            compaction_group_member_tables,
92        }
93    }
94
95    pub fn apply_delta(
96        &mut self,
97        delta: &HashMap<TableId, StateTableInfoDelta>,
98        removed_table_id: &HashSet<TableId>,
99    ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
100        let mut changed_table = HashMap::new();
101        let mut has_bumped_committed_epoch = false;
102        fn remove_table_from_compaction_group(
103            compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
104            compaction_group_id: CompactionGroupId,
105            table_id: TableId,
106        ) {
107            let member_tables = compaction_group_member_tables
108                .get_mut(&compaction_group_id)
109                .expect("should exist");
110            assert!(member_tables.remove(&table_id));
111            if member_tables.is_empty() {
112                assert!(
113                    compaction_group_member_tables
114                        .remove(&compaction_group_id)
115                        .is_some()
116                );
117            }
118        }
119        for table_id in removed_table_id {
120            if let Some(prev_info) = self.state_table_info.remove(table_id) {
121                remove_table_from_compaction_group(
122                    &mut self.compaction_group_member_tables,
123                    prev_info.compaction_group_id,
124                    *table_id,
125                );
126                assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
127            } else {
128                warn!(
129                    %table_id,
130                    "table to remove does not exist"
131                );
132            }
133        }
134        for (table_id, delta) in delta {
135            if removed_table_id.contains(table_id) {
136                continue;
137            }
138            let new_info = StateTableInfo {
139                committed_epoch: delta.committed_epoch,
140                compaction_group_id: delta.compaction_group_id,
141            };
142            match self.state_table_info.entry(*table_id) {
143                Entry::Occupied(mut entry) => {
144                    let prev_info = entry.get_mut();
145                    assert!(
146                        new_info.committed_epoch >= prev_info.committed_epoch,
147                        "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
148                        table_id,
149                        prev_info,
150                        new_info
151                    );
152                    if new_info.committed_epoch > prev_info.committed_epoch {
153                        has_bumped_committed_epoch = true;
154                    }
155                    if prev_info.compaction_group_id != new_info.compaction_group_id {
156                        // table moved to another compaction group
157                        remove_table_from_compaction_group(
158                            &mut self.compaction_group_member_tables,
159                            prev_info.compaction_group_id,
160                            *table_id,
161                        );
162                        assert!(
163                            self.compaction_group_member_tables
164                                .entry(new_info.compaction_group_id)
165                                .or_default()
166                                .insert(*table_id)
167                        );
168                    }
169                    let prev_info = replace(prev_info, new_info);
170                    changed_table.insert(*table_id, Some(prev_info));
171                }
172                Entry::Vacant(entry) => {
173                    assert!(
174                        self.compaction_group_member_tables
175                            .entry(new_info.compaction_group_id)
176                            .or_default()
177                            .insert(*table_id)
178                    );
179                    has_bumped_committed_epoch = true;
180                    entry.insert(new_info);
181                    changed_table.insert(*table_id, None);
182                }
183            }
184        }
185        debug_assert_eq!(
186            self.compaction_group_member_tables,
187            Self::build_compaction_group_member_tables(&self.state_table_info)
188        );
189        (changed_table, has_bumped_committed_epoch)
190    }
191
192    pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
193        &self.state_table_info
194    }
195
196    pub fn compaction_group_member_table_ids(
197        &self,
198        compaction_group_id: CompactionGroupId,
199    ) -> &BTreeSet<TableId> {
200        static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
201        self.compaction_group_member_tables
202            .get(&compaction_group_id)
203            .unwrap_or_else(|| EMPTY_SET.deref())
204    }
205
206    pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
207        &self.compaction_group_member_tables
208    }
209
210    pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
211        self.state_table_info
212            .values()
213            .map(|info| info.committed_epoch)
214            .max()
215    }
216}
217
218#[derive(Debug, Clone, PartialEq)]
219pub struct HummockVersionCommon<T, L = T> {
220    pub id: HummockVersionId,
221    pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
222    #[deprecated]
223    pub(crate) max_committed_epoch: u64,
224    pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
225    pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
226    pub state_table_info: HummockVersionStateTableInfo,
227    pub vector_indexes: HashMap<TableId, VectorIndex>,
228}
229
230pub type HummockVersion = HummockVersionCommon<SstableInfo>;
231
232pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
233
234impl Default for HummockVersion {
235    fn default() -> Self {
236        HummockVersion::from(&PbHummockVersion::default())
237    }
238}
239
240impl<T> HummockVersionCommon<T>
241where
242    T: for<'a> From<&'a PbSstableInfo>,
243    PbSstableInfo: for<'a> From<&'a T>,
244{
245    /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to
246    /// maintain backward compatibility.
247    pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
248        pb_version.into()
249    }
250
251    /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`.
252    /// We should maintain backward compatibility.
253    pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
254        pb_version.into()
255    }
256
257    pub fn to_protobuf(&self) -> PbHummockVersion {
258        self.into()
259    }
260}
261
262impl HummockVersion {
263    pub fn estimated_encode_len(&self) -> usize {
264        self.levels.len() * size_of::<CompactionGroupId>()
265            + self
266                .levels
267                .values()
268                .map(|level| level.estimated_encode_len())
269                .sum::<usize>()
270            + self.table_watermarks.len() * size_of::<u32>()
271            + self
272                .table_watermarks
273                .values()
274                .map(|table_watermark| table_watermark.estimated_encode_len())
275                .sum::<usize>()
276            + self
277                .table_change_log
278                .values()
279                .map(|c| {
280                    c.iter()
281                        .map(|l| {
282                            l.old_value
283                                .iter()
284                                .chain(l.new_value.iter())
285                                .map(|s| s.estimated_encode_len())
286                                .sum::<usize>()
287                        })
288                        .sum::<usize>()
289                })
290                .sum::<usize>()
291    }
292}
293
294impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
295where
296    T: for<'a> From<&'a PbSstableInfo>,
297{
298    fn from(pb_version: &PbHummockVersion) -> Self {
299        #[expect(deprecated)]
300        Self {
301            id: pb_version.id,
302            levels: pb_version
303                .levels
304                .iter()
305                .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
306                .collect(),
307            max_committed_epoch: pb_version.max_committed_epoch,
308            table_watermarks: pb_version
309                .table_watermarks
310                .iter()
311                .map(|(table_id, table_watermark)| {
312                    (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
313                })
314                .collect(),
315            table_change_log: pb_version
316                .table_change_logs
317                .iter()
318                .map(|(table_id, change_log)| {
319                    (*table_id, TableChangeLogCommon::from_protobuf(change_log))
320                })
321                .collect(),
322            state_table_info: HummockVersionStateTableInfo::from_protobuf(
323                &pb_version.state_table_info,
324            ),
325            vector_indexes: pb_version
326                .vector_indexes
327                .iter()
328                .map(|(table_id, index)| (*table_id, index.clone().into()))
329                .collect(),
330        }
331    }
332}
333
334impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
335where
336    PbSstableInfo: for<'a> From<&'a T>,
337{
338    fn from(version: &HummockVersionCommon<T>) -> Self {
339        #[expect(deprecated)]
340        Self {
341            id: version.id,
342            levels: version
343                .levels
344                .iter()
345                .map(|(group_id, levels)| (*group_id, levels.into()))
346                .collect(),
347            max_committed_epoch: version.max_committed_epoch,
348            table_watermarks: version
349                .table_watermarks
350                .iter()
351                .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
352                .collect(),
353            table_change_logs: version
354                .table_change_log
355                .iter()
356                .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
357                .collect(),
358            state_table_info: version.state_table_info.state_table_info.clone(),
359            vector_indexes: version
360                .vector_indexes
361                .iter()
362                .map(|(table_id, index)| (*table_id, index.clone().into()))
363                .collect(),
364        }
365    }
366}
367
368impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
369where
370    PbSstableInfo: From<T>,
371    PbSstableInfo: for<'a> From<&'a T>,
372{
373    fn from(version: HummockVersionCommon<T>) -> Self {
374        #[expect(deprecated)]
375        Self {
376            id: version.id,
377            levels: version
378                .levels
379                .into_iter()
380                .map(|(group_id, levels)| (group_id, levels.into()))
381                .collect(),
382            max_committed_epoch: version.max_committed_epoch,
383            table_watermarks: version
384                .table_watermarks
385                .into_iter()
386                .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
387                .collect(),
388            table_change_logs: version
389                .table_change_log
390                .into_iter()
391                .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
392                .collect(),
393            state_table_info: version.state_table_info.state_table_info.clone(),
394            vector_indexes: version
395                .vector_indexes
396                .into_iter()
397                .map(|(table_id, index)| (table_id, index.into()))
398                .collect(),
399        }
400    }
401}
402
403impl HummockVersion {
404    pub fn next_version_id(&self) -> HummockVersionId {
405        self.id + 1
406    }
407
408    pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409        // for backward-compatibility of previous hummock version delta
410        self.state_table_info.state_table_info.is_empty()
411            && self.levels.values().any(|group| {
412                // state_table_info is not previously filled, but there previously exists some tables
413                #[expect(deprecated)]
414                !group.member_table_ids.is_empty()
415            })
416    }
417
418    pub fn may_fill_backward_compatible_state_table_info_delta(
419        &self,
420        delta: &mut HummockVersionDelta,
421    ) {
422        #[expect(deprecated)]
423        // for backward-compatibility of previous hummock version delta
424        for (cg_id, group) in &self.levels {
425            for table_id in &group.member_table_ids {
426                assert!(
427                    delta
428                        .state_table_info_delta
429                        .insert(
430                            (*table_id).into(),
431                            StateTableInfoDelta {
432                                committed_epoch: self.max_committed_epoch,
433                                compaction_group_id: *cg_id,
434                            }
435                        )
436                        .is_none(),
437                    "duplicate table id {} in cg {}",
438                    table_id,
439                    cg_id
440                );
441            }
442        }
443    }
444
445    pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446        #[expect(deprecated)]
447        let mut init_version = HummockVersion {
448            id: FIRST_VERSION_ID,
449            levels: Default::default(),
450            max_committed_epoch: INVALID_EPOCH,
451            table_watermarks: HashMap::new(),
452            table_change_log: HashMap::new(),
453            state_table_info: HummockVersionStateTableInfo::empty(),
454            vector_indexes: Default::default(),
455        };
456        for group_id in [
457            StaticCompactionGroupId::StateDefault as CompactionGroupId,
458            StaticCompactionGroupId::MaterializedView as CompactionGroupId,
459        ] {
460            init_version.levels.insert(
461                group_id,
462                build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
463            );
464        }
465        init_version
466    }
467
468    pub fn version_delta_after(&self) -> HummockVersionDelta {
469        #[expect(deprecated)]
470        HummockVersionDelta {
471            id: self.next_version_id(),
472            prev_id: self.id,
473            trivial_move: false,
474            max_committed_epoch: self.max_committed_epoch,
475            group_deltas: Default::default(),
476            new_table_watermarks: HashMap::new(),
477            removed_table_ids: HashSet::new(),
478            change_log_delta: HashMap::new(),
479            state_table_info_delta: Default::default(),
480            vector_index_delta: Default::default(),
481        }
482    }
483
484    pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
485        let table_change_log = {
486            let mut table_change_log = HashMap::new();
487            for (table_id, log) in &mut self.table_change_log {
488                let change_log_iter =
489                    log.change_log_iter_mut()
490                        .map(|item| EpochNewChangeLogCommon {
491                            new_value: std::mem::take(&mut item.new_value),
492                            old_value: std::mem::take(&mut item.old_value),
493                            non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
494                            checkpoint_epoch: item.checkpoint_epoch,
495                        });
496                table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
497            }
498
499            table_change_log
500        };
501
502        let local_version = LocalHummockVersion::from(self);
503
504        (local_version, table_change_log)
505    }
506}
507
508impl<T, L> HummockVersionCommon<T, L> {
509    pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
510        self.state_table_info
511            .info()
512            .get(&table_id)
513            .map(|info| info.committed_epoch)
514    }
515}
516
517#[derive(Debug, PartialEq, Clone)]
518pub struct HummockVersionDeltaCommon<T, L = T> {
519    pub id: HummockVersionId,
520    pub prev_id: HummockVersionId,
521    pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
522    #[deprecated]
523    pub(crate) max_committed_epoch: u64,
524    pub trivial_move: bool,
525    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
526    pub removed_table_ids: HashSet<TableId>,
527    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
528    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
529    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
530}
531
532pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
533
534pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
535
536impl Default for HummockVersionDelta {
537    fn default() -> Self {
538        HummockVersionDelta::from(&PbHummockVersionDelta::default())
539    }
540}
541
542impl<T> HummockVersionDeltaCommon<T>
543where
544    T: for<'a> From<&'a PbSstableInfo>,
545    PbSstableInfo: for<'a> From<&'a T>,
546{
547    /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`.
548    /// We should maintain backward compatibility.
549    pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
550        delta.into()
551    }
552
553    /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to
554    /// maintain backward compatibility.
555    pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
556        delta.into()
557    }
558
559    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
560        self.into()
561    }
562}
563
564pub trait SstableIdReader {
565    fn sst_id(&self) -> HummockSstableId;
566}
567
568pub trait ObjectIdReader {
569    fn object_id(&self) -> HummockSstableObjectId;
570}
571
572impl<T> HummockVersionDeltaCommon<T>
573where
574    T: SstableIdReader + ObjectIdReader,
575{
576    /// Get the newly added object ids from the version delta.
577    ///
578    /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
579    /// but it is possible that the object is moved or split from other compaction groups or levels.
580    pub fn newly_added_object_ids(
581        &self,
582        exclude_table_change_log: bool,
583    ) -> HashSet<HummockObjectId> {
584        // DO NOT REMOVE THIS LINE
585        // This is to ensure that when adding new variant to `HummockObjectId`,
586        // the compiler will warn us if we forget to handle it here.
587        match HummockObjectId::Sstable(0.into()) {
588            HummockObjectId::Sstable(_) => {}
589            HummockObjectId::VectorFile(_) => {}
590            HummockObjectId::HnswGraphFile(_) => {}
591        };
592        self.newly_added_sst_infos(exclude_table_change_log)
593            .map(|sst| HummockObjectId::Sstable(sst.object_id()))
594            .chain(
595                self.vector_index_delta
596                    .values()
597                    .flat_map(|vector_index_delta| {
598                        vector_index_delta
599                            .newly_added_objects()
600                            .map(|(object_id, _)| object_id)
601                    }),
602            )
603            .collect()
604    }
605
606    pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
607        self.newly_added_sst_infos(exclude_table_change_log)
608            .map(|sst| sst.sst_id())
609            .collect()
610    }
611}
612
613impl<T> HummockVersionDeltaCommon<T> {
614    pub fn newly_added_sst_infos(
615        &self,
616        exclude_table_change_log: bool,
617    ) -> impl Iterator<Item = &'_ T> {
618        let may_table_change_delta = if exclude_table_change_log {
619            None
620        } else {
621            Some(self.change_log_delta.values())
622        };
623        self.group_deltas
624            .values()
625            .flat_map(|group_deltas| {
626                group_deltas.group_deltas.iter().flat_map(|group_delta| {
627                    let sst_slice = match &group_delta {
628                        GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
629                        | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
630                            inserted_table_infos,
631                            ..
632                        }) => Some(inserted_table_infos.iter()),
633                        GroupDeltaCommon::GroupConstruct(_)
634                        | GroupDeltaCommon::GroupDestroy(_)
635                        | GroupDeltaCommon::GroupMerge(_)
636                        | GroupDeltaCommon::TruncateTables(_) => None,
637                    };
638                    sst_slice.into_iter().flatten()
639                })
640            })
641            .chain(
642                may_table_change_delta
643                    .map(|v| {
644                        v.flat_map(|delta| {
645                            let new_log = &delta.new_log;
646                            new_log.new_value.iter().chain(new_log.old_value.iter())
647                        })
648                    })
649                    .into_iter()
650                    .flatten(),
651            )
652    }
653}
654
655impl HummockVersionDelta {
656    #[expect(deprecated)]
657    pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
658        self.max_committed_epoch
659    }
660}
661
662impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
663where
664    T: for<'a> From<&'a PbSstableInfo>,
665{
666    fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
667        #[expect(deprecated)]
668        Self {
669            id: pb_version_delta.id,
670            prev_id: pb_version_delta.prev_id,
671            group_deltas: pb_version_delta
672                .group_deltas
673                .iter()
674                .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
675                .collect(),
676            max_committed_epoch: pb_version_delta.max_committed_epoch,
677            trivial_move: pb_version_delta.trivial_move,
678            new_table_watermarks: pb_version_delta
679                .new_table_watermarks
680                .iter()
681                .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
682                .collect(),
683            removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
684            change_log_delta: pb_version_delta
685                .change_log_delta
686                .iter()
687                .map(|(table_id, log_delta)| {
688                    (
689                        *table_id,
690                        ChangeLogDeltaCommon {
691                            truncate_epoch: log_delta.truncate_epoch,
692                            new_log: log_delta.new_log.as_ref().unwrap().into(),
693                        },
694                    )
695                })
696                .collect(),
697
698            state_table_info_delta: pb_version_delta
699                .state_table_info_delta
700                .iter()
701                .map(|(table_id, delta)| (*table_id, *delta))
702                .collect(),
703            vector_index_delta: pb_version_delta
704                .vector_index_delta
705                .iter()
706                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
707                .collect(),
708        }
709    }
710}
711
712impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
713where
714    PbSstableInfo: for<'a> From<&'a T>,
715{
716    fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
717        #[expect(deprecated)]
718        Self {
719            id: version_delta.id,
720            prev_id: version_delta.prev_id,
721            group_deltas: version_delta
722                .group_deltas
723                .iter()
724                .map(|(group_id, deltas)| (*group_id, deltas.into()))
725                .collect(),
726            max_committed_epoch: version_delta.max_committed_epoch,
727            trivial_move: version_delta.trivial_move,
728            new_table_watermarks: version_delta
729                .new_table_watermarks
730                .iter()
731                .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
732                .collect(),
733            removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
734            change_log_delta: version_delta
735                .change_log_delta
736                .iter()
737                .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
738                .collect(),
739            state_table_info_delta: version_delta.state_table_info_delta.clone(),
740            vector_index_delta: version_delta
741                .vector_index_delta
742                .iter()
743                .map(|(table_id, delta)| (*table_id, delta.clone().into()))
744                .collect(),
745        }
746    }
747}
748
749impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
750where
751    PbSstableInfo: From<T>,
752{
753    fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
754        #[expect(deprecated)]
755        Self {
756            id: version_delta.id,
757            prev_id: version_delta.prev_id,
758            group_deltas: version_delta
759                .group_deltas
760                .into_iter()
761                .map(|(group_id, deltas)| (group_id, deltas.into()))
762                .collect(),
763            max_committed_epoch: version_delta.max_committed_epoch,
764            trivial_move: version_delta.trivial_move,
765            new_table_watermarks: version_delta
766                .new_table_watermarks
767                .into_iter()
768                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
769                .collect(),
770            removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
771            change_log_delta: version_delta
772                .change_log_delta
773                .into_iter()
774                .map(|(table_id, log_delta)| (table_id, log_delta.into()))
775                .collect(),
776            state_table_info_delta: version_delta.state_table_info_delta,
777            vector_index_delta: version_delta
778                .vector_index_delta
779                .into_iter()
780                .map(|(table_id, delta)| (table_id, delta.into()))
781                .collect(),
782        }
783    }
784}
785
786impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
787where
788    T: From<PbSstableInfo>,
789{
790    fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
791        #[expect(deprecated)]
792        Self {
793            id: pb_version_delta.id,
794            prev_id: pb_version_delta.prev_id,
795            group_deltas: pb_version_delta
796                .group_deltas
797                .into_iter()
798                .map(|(group_id, deltas)| (group_id, deltas.into()))
799                .collect(),
800            max_committed_epoch: pb_version_delta.max_committed_epoch,
801            trivial_move: pb_version_delta.trivial_move,
802            new_table_watermarks: pb_version_delta
803                .new_table_watermarks
804                .into_iter()
805                .map(|(table_id, watermarks)| (table_id, watermarks.into()))
806                .collect(),
807            removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
808            change_log_delta: pb_version_delta
809                .change_log_delta
810                .iter()
811                .map(|(table_id, log_delta)| {
812                    (
813                        *table_id,
814                        ChangeLogDeltaCommon {
815                            new_log: log_delta.new_log.clone().unwrap().into(),
816                            truncate_epoch: log_delta.truncate_epoch,
817                        },
818                    )
819                })
820                .collect(),
821            state_table_info_delta: pb_version_delta
822                .state_table_info_delta
823                .iter()
824                .map(|(table_id, delta)| (*table_id, *delta))
825                .collect(),
826            vector_index_delta: pb_version_delta
827                .vector_index_delta
828                .into_iter()
829                .map(|(table_id, delta)| (table_id, delta.into()))
830                .collect(),
831        }
832    }
833}
834
835#[derive(Debug, PartialEq, Clone)]
836pub struct IntraLevelDeltaCommon<T> {
837    pub level_idx: u32,
838    pub l0_sub_level_id: u64,
839    pub removed_table_ids: HashSet<HummockSstableId>,
840    pub inserted_table_infos: Vec<T>,
841    pub vnode_partition_count: u32,
842    pub compaction_group_version_id: u64,
843}
844
845pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
846
847impl IntraLevelDelta {
848    pub fn estimated_encode_len(&self) -> usize {
849        size_of::<u32>()
850            + size_of::<u64>()
851            + self.removed_table_ids.len() * size_of::<u32>()
852            + self
853                .inserted_table_infos
854                .iter()
855                .map(|sst| sst.estimated_encode_len())
856                .sum::<usize>()
857            + size_of::<u32>()
858    }
859}
860
861impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
862where
863    T: From<PbSstableInfo>,
864{
865    fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
866        Self {
867            level_idx: pb_intra_level_delta.level_idx,
868            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
869            removed_table_ids: HashSet::from_iter(
870                pb_intra_level_delta.removed_table_ids.iter().copied(),
871            ),
872            inserted_table_infos: pb_intra_level_delta
873                .inserted_table_infos
874                .into_iter()
875                .map(Into::into)
876                .collect_vec(),
877            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
878            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
879        }
880    }
881}
882
883impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
884where
885    PbSstableInfo: From<T>,
886{
887    fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
888        Self {
889            level_idx: intra_level_delta.level_idx,
890            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
891            removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
892            inserted_table_infos: intra_level_delta
893                .inserted_table_infos
894                .into_iter()
895                .map(Into::into)
896                .collect_vec(),
897            vnode_partition_count: intra_level_delta.vnode_partition_count,
898            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
899        }
900    }
901}
902
903impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
904where
905    PbSstableInfo: for<'a> From<&'a T>,
906{
907    fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
908        Self {
909            level_idx: intra_level_delta.level_idx,
910            l0_sub_level_id: intra_level_delta.l0_sub_level_id,
911            removed_table_ids: intra_level_delta
912                .removed_table_ids
913                .iter()
914                .copied()
915                .collect(),
916            inserted_table_infos: intra_level_delta
917                .inserted_table_infos
918                .iter()
919                .map(Into::into)
920                .collect_vec(),
921            vnode_partition_count: intra_level_delta.vnode_partition_count,
922            compaction_group_version_id: intra_level_delta.compaction_group_version_id,
923        }
924    }
925}
926
927impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
928where
929    T: for<'a> From<&'a PbSstableInfo>,
930{
931    fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
932        Self {
933            level_idx: pb_intra_level_delta.level_idx,
934            l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
935            removed_table_ids: HashSet::from_iter(
936                pb_intra_level_delta.removed_table_ids.iter().copied(),
937            ),
938            inserted_table_infos: pb_intra_level_delta
939                .inserted_table_infos
940                .iter()
941                .map(Into::into)
942                .collect_vec(),
943            vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
944            compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
945        }
946    }
947}
948
949impl IntraLevelDelta {
950    pub fn new(
951        level_idx: u32,
952        l0_sub_level_id: u64,
953        removed_table_ids: HashSet<HummockSstableId>,
954        inserted_table_infos: Vec<SstableInfo>,
955        vnode_partition_count: u32,
956        compaction_group_version_id: u64,
957    ) -> Self {
958        Self {
959            level_idx,
960            l0_sub_level_id,
961            removed_table_ids,
962            inserted_table_infos,
963            vnode_partition_count,
964            compaction_group_version_id,
965        }
966    }
967}
968
969#[derive(Debug, PartialEq, Clone)]
970pub enum GroupDeltaCommon<T> {
971    NewL0SubLevel(Vec<T>),
972    IntraLevel(IntraLevelDeltaCommon<T>),
973    GroupConstruct(Box<PbGroupConstruct>),
974    GroupDestroy(PbGroupDestroy),
975    GroupMerge(PbGroupMerge),
976    TruncateTables(HashSet<TableId>),
977}
978
979pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
980
981impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
982where
983    T: From<PbSstableInfo>,
984{
985    fn from(pb_group_delta: PbGroupDelta) -> Self {
986        match pb_group_delta.delta_type {
987            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
988                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
989            }
990            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
991                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
992            }
993            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
994                GroupDeltaCommon::GroupDestroy(pb_group_destroy)
995            }
996            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
997                GroupDeltaCommon::GroupMerge(pb_group_merge)
998            }
999            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1000                pb_new_sub_level
1001                    .inserted_table_infos
1002                    .into_iter()
1003                    .map(T::from)
1004                    .collect(),
1005            ),
1006            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1007                GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1008            }
1009
1010            None => panic!("delta_type is not set"),
1011        }
1012    }
1013}
1014
1015impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1016where
1017    PbSstableInfo: From<T>,
1018{
1019    fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1020        match group_delta {
1021            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1022                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1023            },
1024            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1025                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1026            },
1027            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1028                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1029            },
1030            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1031                delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1032            },
1033            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1034                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1035                    inserted_table_infos: new_sub_level
1036                        .into_iter()
1037                        .map(PbSstableInfo::from)
1038                        .collect(),
1039                })),
1040            },
1041            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1042                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1043                    table_ids: table_ids.iter().copied().collect(),
1044                })),
1045            },
1046        }
1047    }
1048}
1049
1050impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1051where
1052    PbSstableInfo: for<'a> From<&'a T>,
1053{
1054    fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1055        match group_delta {
1056            GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1057                delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1058            },
1059            GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1060                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1061            },
1062            GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1063                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1064            },
1065            GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1066                delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1067            },
1068            GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1069                delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1070                    inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1071                })),
1072            },
1073            GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1074                delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1075                    table_ids: table_ids.iter().copied().collect(),
1076                })),
1077            },
1078        }
1079    }
1080}
1081
1082impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1083where
1084    T: for<'a> From<&'a PbSstableInfo>,
1085{
1086    fn from(pb_group_delta: &PbGroupDelta) -> Self {
1087        match &pb_group_delta.delta_type {
1088            Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1089                GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1090            }
1091            Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1092                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1093            }
1094            Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1095                GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1096            }
1097            Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1098                GroupDeltaCommon::GroupMerge(*pb_group_merge)
1099            }
1100            Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1101                pb_new_sub_level
1102                    .inserted_table_infos
1103                    .iter()
1104                    .map(T::from)
1105                    .collect(),
1106            ),
1107            Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1108                GroupDeltaCommon::TruncateTables(
1109                    pb_truncate_tables.table_ids.iter().copied().collect(),
1110                )
1111            }
1112            None => panic!("delta_type is not set"),
1113        }
1114    }
1115}
1116
1117#[derive(Debug, PartialEq, Clone)]
1118pub struct GroupDeltasCommon<T> {
1119    pub group_deltas: Vec<GroupDeltaCommon<T>>,
1120}
1121
1122impl<T> Default for GroupDeltasCommon<T> {
1123    fn default() -> Self {
1124        Self {
1125            group_deltas: vec![],
1126        }
1127    }
1128}
1129
1130pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1131
1132impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1133where
1134    T: From<PbSstableInfo>,
1135{
1136    fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1137        Self {
1138            group_deltas: pb_group_deltas
1139                .group_deltas
1140                .into_iter()
1141                .map(GroupDeltaCommon::from)
1142                .collect_vec(),
1143        }
1144    }
1145}
1146
1147impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1148where
1149    PbSstableInfo: From<T>,
1150{
1151    fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1152        Self {
1153            group_deltas: group_deltas
1154                .group_deltas
1155                .into_iter()
1156                .map(|group_delta| group_delta.into())
1157                .collect_vec(),
1158        }
1159    }
1160}
1161
1162impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1163where
1164    PbSstableInfo: for<'a> From<&'a T>,
1165{
1166    fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1167        Self {
1168            group_deltas: group_deltas
1169                .group_deltas
1170                .iter()
1171                .map(|group_delta| group_delta.into())
1172                .collect_vec(),
1173        }
1174    }
1175}
1176
1177impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1178where
1179    T: for<'a> From<&'a PbSstableInfo>,
1180{
1181    fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1182        Self {
1183            group_deltas: pb_group_deltas
1184                .group_deltas
1185                .iter()
1186                .map(GroupDeltaCommon::from)
1187                .collect_vec(),
1188        }
1189    }
1190}
1191
1192impl<T> GroupDeltasCommon<T>
1193where
1194    PbSstableInfo: for<'a> From<&'a T>,
1195{
1196    pub fn to_protobuf(&self) -> PbGroupDeltas {
1197        self.into()
1198    }
1199}
1200
1201impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1202    #[expect(deprecated)]
1203    fn from(delta: HummockVersionDelta) -> Self {
1204        Self {
1205            id: delta.id,
1206            prev_id: delta.prev_id,
1207            group_deltas: delta.group_deltas,
1208            max_committed_epoch: delta.max_committed_epoch,
1209            trivial_move: delta.trivial_move,
1210            new_table_watermarks: delta.new_table_watermarks,
1211            removed_table_ids: delta.removed_table_ids,
1212            change_log_delta: delta
1213                .change_log_delta
1214                .into_iter()
1215                .map(|(k, v)| {
1216                    (
1217                        k,
1218                        ChangeLogDeltaCommon {
1219                            truncate_epoch: v.truncate_epoch,
1220                            new_log: EpochNewChangeLogCommon {
1221                                new_value: Vec::new(),
1222                                old_value: Vec::new(),
1223                                non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1224                                checkpoint_epoch: v.new_log.checkpoint_epoch,
1225                            },
1226                        },
1227                    )
1228                })
1229                .collect(),
1230            state_table_info_delta: delta.state_table_info_delta,
1231            vector_index_delta: delta.vector_index_delta,
1232        }
1233    }
1234}
1235
1236impl From<HummockVersion> for LocalHummockVersion {
1237    #[expect(deprecated)]
1238    fn from(version: HummockVersion) -> Self {
1239        Self {
1240            id: version.id,
1241            levels: version.levels,
1242            max_committed_epoch: version.max_committed_epoch,
1243            table_watermarks: version.table_watermarks,
1244            table_change_log: version
1245                .table_change_log
1246                .into_iter()
1247                .map(|(k, v)| {
1248                    let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1249                        .change_log_into_iter()
1250                        .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1251                            new_value: Vec::new(),
1252                            old_value: Vec::new(),
1253                            non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1254                            checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1255                        })
1256                        .collect();
1257                    (k, TableChangeLogCommon::new(epoch_new_change_logs))
1258                })
1259                .collect(),
1260            state_table_info: version.state_table_info,
1261            vector_indexes: version.vector_indexes,
1262        }
1263    }
1264}