1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon};
30use crate::compaction_group::StaticCompactionGroupId;
31use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
32use crate::level::LevelsCommon;
33use crate::sstable_info::SstableInfo;
34use crate::table_watermark::TableWatermarks;
35use crate::vector_index::{VectorIndex, VectorIndexDelta};
36use crate::{
37 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
38 HummockSstableObjectId, HummockVersionId,
39};
40
41pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (*table_id, *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn from_protobuf_owned(state_table_info: HashMap<TableId, PbStateTableInfo>) -> Self {
94 let compaction_group_member_tables =
95 Self::build_compaction_group_member_tables(&state_table_info);
96 Self {
97 state_table_info,
98 compaction_group_member_tables,
99 }
100 }
101
102 pub fn apply_delta(
103 &mut self,
104 delta: &HashMap<TableId, StateTableInfoDelta>,
105 removed_table_id: &HashSet<TableId>,
106 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
107 let mut changed_table = HashMap::new();
108 let mut has_bumped_committed_epoch = false;
109 fn remove_table_from_compaction_group(
110 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
111 compaction_group_id: CompactionGroupId,
112 table_id: TableId,
113 ) {
114 let member_tables = compaction_group_member_tables
115 .get_mut(&compaction_group_id)
116 .expect("should exist");
117 assert!(member_tables.remove(&table_id));
118 if member_tables.is_empty() {
119 assert!(
120 compaction_group_member_tables
121 .remove(&compaction_group_id)
122 .is_some()
123 );
124 }
125 }
126 for table_id in removed_table_id {
127 if let Some(prev_info) = self.state_table_info.remove(table_id) {
128 remove_table_from_compaction_group(
129 &mut self.compaction_group_member_tables,
130 prev_info.compaction_group_id,
131 *table_id,
132 );
133 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
134 } else {
135 warn!(
136 %table_id,
137 "table to remove does not exist"
138 );
139 }
140 }
141 for (table_id, delta) in delta {
142 if removed_table_id.contains(table_id) {
143 continue;
144 }
145 let new_info = StateTableInfo {
146 committed_epoch: delta.committed_epoch,
147 compaction_group_id: delta.compaction_group_id,
148 };
149 match self.state_table_info.entry(*table_id) {
150 Entry::Occupied(mut entry) => {
151 let prev_info = entry.get_mut();
152 assert!(
153 new_info.committed_epoch >= prev_info.committed_epoch,
154 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
155 table_id,
156 prev_info,
157 new_info
158 );
159 if new_info.committed_epoch > prev_info.committed_epoch {
160 has_bumped_committed_epoch = true;
161 }
162 if prev_info.compaction_group_id != new_info.compaction_group_id {
163 remove_table_from_compaction_group(
165 &mut self.compaction_group_member_tables,
166 prev_info.compaction_group_id,
167 *table_id,
168 );
169 assert!(
170 self.compaction_group_member_tables
171 .entry(new_info.compaction_group_id)
172 .or_default()
173 .insert(*table_id)
174 );
175 }
176 let prev_info = replace(prev_info, new_info);
177 changed_table.insert(*table_id, Some(prev_info));
178 }
179 Entry::Vacant(entry) => {
180 assert!(
181 self.compaction_group_member_tables
182 .entry(new_info.compaction_group_id)
183 .or_default()
184 .insert(*table_id)
185 );
186 has_bumped_committed_epoch = true;
187 entry.insert(new_info);
188 changed_table.insert(*table_id, None);
189 }
190 }
191 }
192 debug_assert_eq!(
193 self.compaction_group_member_tables,
194 Self::build_compaction_group_member_tables(&self.state_table_info)
195 );
196 (changed_table, has_bumped_committed_epoch)
197 }
198
199 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
200 &self.state_table_info
201 }
202
203 pub fn compaction_group_member_table_ids(
204 &self,
205 compaction_group_id: CompactionGroupId,
206 ) -> &BTreeSet<TableId> {
207 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
208 self.compaction_group_member_tables
209 .get(&compaction_group_id)
210 .unwrap_or_else(|| EMPTY_SET.deref())
211 }
212
213 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
214 &self.compaction_group_member_tables
215 }
216
217 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
218 self.state_table_info
219 .values()
220 .map(|info| info.committed_epoch)
221 .max()
222 }
223}
224
225#[derive(Debug, Clone, PartialEq)]
226pub struct HummockVersionCommon<T, L = T> {
227 pub id: HummockVersionId,
228 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
229 #[deprecated]
230 pub(crate) max_committed_epoch: u64,
231 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
232 #[deprecated]
233 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
234 pub state_table_info: HummockVersionStateTableInfo,
235 pub vector_indexes: HashMap<TableId, VectorIndex>,
236}
237
238pub type HummockVersion = HummockVersionCommon<SstableInfo>;
239
240pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
241
242impl Default for HummockVersion {
243 fn default() -> Self {
244 HummockVersion::from(&PbHummockVersion::default())
245 }
246}
247
248impl<T> HummockVersionCommon<T>
249where
250 T: for<'a> From<&'a PbSstableInfo>,
251 PbSstableInfo: for<'a> From<&'a T>,
252{
253 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
256 pb_version.into()
257 }
258
259 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
262 pb_version.into()
263 }
264
265 pub fn to_protobuf(&self) -> PbHummockVersion {
266 self.into()
267 }
268}
269
270impl<T> HummockVersionCommon<T>
271where
272 T: From<PbSstableInfo>,
273 PbSstableInfo: for<'a> From<&'a T>,
274{
275 pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
278 pb_version.into()
279 }
280}
281
282impl HummockVersion {
283 #[expect(deprecated)]
284 pub fn estimated_encode_len(&self) -> usize {
285 self.levels.len() * size_of::<CompactionGroupId>()
286 + self
287 .levels
288 .values()
289 .map(|level| level.estimated_encode_len())
290 .sum::<usize>()
291 + self.table_watermarks.len() * size_of::<u32>()
292 + self
293 .table_watermarks
294 .values()
295 .map(|table_watermark| table_watermark.estimated_encode_len())
296 .sum::<usize>()
297 + self
298 .table_change_log
299 .values()
300 .map(|c| {
301 c.iter()
302 .map(|l| {
303 l.old_value
304 .iter()
305 .chain(l.new_value.iter())
306 .map(|s| s.estimated_encode_len())
307 .sum::<usize>()
308 })
309 .sum::<usize>()
310 })
311 .sum::<usize>()
312 }
313}
314
315impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
316where
317 T: for<'a> From<&'a PbSstableInfo>,
318{
319 fn from(pb_version: &PbHummockVersion) -> Self {
320 #[expect(deprecated)]
321 Self {
322 id: pb_version.id,
323 levels: pb_version
324 .levels
325 .iter()
326 .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
327 .collect(),
328 max_committed_epoch: pb_version.max_committed_epoch,
329 table_watermarks: pb_version
330 .table_watermarks
331 .iter()
332 .map(|(table_id, table_watermark)| {
333 (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
334 })
335 .collect(),
336 table_change_log: pb_version
337 .table_change_logs
338 .iter()
339 .map(|(table_id, change_log)| {
340 (*table_id, TableChangeLogCommon::from_protobuf(change_log))
341 })
342 .collect(),
343 state_table_info: HummockVersionStateTableInfo::from_protobuf(
344 &pb_version.state_table_info,
345 ),
346 vector_indexes: pb_version
347 .vector_indexes
348 .iter()
349 .map(|(table_id, index)| (*table_id, index.clone().into()))
350 .collect(),
351 }
352 }
353}
354
355impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
356where
357 T: From<PbSstableInfo>,
358{
359 fn from(pb_version: PbHummockVersion) -> Self {
360 #[expect(deprecated)]
361 Self {
362 id: pb_version.id,
363 levels: pb_version
364 .levels
365 .into_iter()
366 .map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
367 .collect(),
368 max_committed_epoch: pb_version.max_committed_epoch,
369 table_watermarks: pb_version
370 .table_watermarks
371 .into_iter()
372 .map(|(table_id, table_watermark)| {
373 (table_id, Arc::new(TableWatermarks::from(table_watermark)))
374 })
375 .collect(),
376 table_change_log: pb_version
377 .table_change_logs
378 .into_iter()
379 .map(|(table_id, change_log)| {
380 (
381 table_id,
382 TableChangeLogCommon::from_protobuf_owned(change_log),
383 )
384 })
385 .collect(),
386 state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
387 pb_version.state_table_info,
388 ),
389 vector_indexes: pb_version
390 .vector_indexes
391 .into_iter()
392 .map(|(table_id, index)| (table_id, index.into()))
393 .collect(),
394 }
395 }
396}
397
398impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
399where
400 PbSstableInfo: for<'a> From<&'a T>,
401{
402 fn from(version: &HummockVersionCommon<T>) -> Self {
403 #[expect(deprecated)]
404 Self {
405 id: version.id,
406 levels: version
407 .levels
408 .iter()
409 .map(|(group_id, levels)| (*group_id, levels.into()))
410 .collect(),
411 max_committed_epoch: version.max_committed_epoch,
412 table_watermarks: version
413 .table_watermarks
414 .iter()
415 .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
416 .collect(),
417 table_change_logs: version
418 .table_change_log
419 .iter()
420 .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
421 .collect(),
422 state_table_info: version.state_table_info.state_table_info.clone(),
423 vector_indexes: version
424 .vector_indexes
425 .iter()
426 .map(|(table_id, index)| (*table_id, index.clone().into()))
427 .collect(),
428 }
429 }
430}
431
432impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
433where
434 PbSstableInfo: From<T>,
435 PbSstableInfo: for<'a> From<&'a T>,
436{
437 fn from(version: HummockVersionCommon<T>) -> Self {
438 #[expect(deprecated)]
439 Self {
440 id: version.id,
441 levels: version
442 .levels
443 .into_iter()
444 .map(|(group_id, levels)| (group_id, levels.into()))
445 .collect(),
446 max_committed_epoch: version.max_committed_epoch,
447 table_watermarks: version
448 .table_watermarks
449 .into_iter()
450 .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
451 .collect(),
452 table_change_logs: version
453 .table_change_log
454 .into_iter()
455 .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
456 .collect(),
457 state_table_info: version.state_table_info.state_table_info.clone(),
458 vector_indexes: version
459 .vector_indexes
460 .into_iter()
461 .map(|(table_id, index)| (table_id, index.into()))
462 .collect(),
463 }
464 }
465}
466
467impl HummockVersion {
468 pub fn next_version_id(&self) -> HummockVersionId {
469 self.id + 1
470 }
471
472 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
473 self.state_table_info.state_table_info.is_empty()
475 && self.levels.values().any(|group| {
476 #[expect(deprecated)]
478 !group.member_table_ids.is_empty()
479 })
480 }
481
482 pub fn may_fill_backward_compatible_state_table_info_delta(
483 &self,
484 delta: &mut HummockVersionDelta,
485 ) {
486 #[expect(deprecated)]
487 for (cg_id, group) in &self.levels {
489 for table_id in &group.member_table_ids {
490 assert!(
491 delta
492 .state_table_info_delta
493 .insert(
494 (*table_id).into(),
495 StateTableInfoDelta {
496 committed_epoch: self.max_committed_epoch,
497 compaction_group_id: *cg_id,
498 }
499 )
500 .is_none(),
501 "duplicate table id {} in cg {}",
502 table_id,
503 cg_id
504 );
505 }
506 }
507 }
508
509 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
510 #[expect(deprecated)]
511 let mut init_version = HummockVersion {
512 id: FIRST_VERSION_ID,
513 levels: Default::default(),
514 max_committed_epoch: INVALID_EPOCH,
515 table_watermarks: HashMap::new(),
516 table_change_log: HashMap::new(),
517 state_table_info: HummockVersionStateTableInfo::empty(),
518 vector_indexes: Default::default(),
519 };
520 for group_id in [
521 StaticCompactionGroupId::StateDefault as CompactionGroupId,
522 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
523 ] {
524 init_version.levels.insert(
525 group_id,
526 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
527 );
528 }
529 init_version
530 }
531
532 pub fn version_delta_after(&self) -> HummockVersionDelta {
533 #[expect(deprecated)]
534 HummockVersionDelta {
535 id: self.next_version_id(),
536 prev_id: self.id,
537 trivial_move: false,
538 max_committed_epoch: self.max_committed_epoch,
539 group_deltas: Default::default(),
540 new_table_watermarks: HashMap::new(),
541 removed_table_ids: HashSet::new(),
542 change_log_delta: HashMap::new(),
543 state_table_info_delta: Default::default(),
544 vector_index_delta: Default::default(),
545 }
546 }
547}
548
549impl<T, L> HummockVersionCommon<T, L> {
550 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
551 self.state_table_info
552 .info()
553 .get(&table_id)
554 .map(|info| info.committed_epoch)
555 }
556}
557
558#[derive(Debug, PartialEq, Clone)]
559pub struct HummockVersionDeltaCommon<T, L = T> {
560 pub id: HummockVersionId,
561 pub prev_id: HummockVersionId,
562 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
563 #[deprecated]
564 pub(crate) max_committed_epoch: u64,
565 pub trivial_move: bool,
566 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
567 pub removed_table_ids: HashSet<TableId>,
568 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
569 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
570 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
571}
572
573pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
574
575pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
576
577impl Default for HummockVersionDelta {
578 fn default() -> Self {
579 HummockVersionDelta::from(&PbHummockVersionDelta::default())
580 }
581}
582
583impl<T> HummockVersionDeltaCommon<T>
584where
585 T: for<'a> From<&'a PbSstableInfo>,
586 PbSstableInfo: for<'a> From<&'a T>,
587{
588 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
591 delta.into()
592 }
593
594 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
597 delta.into()
598 }
599
600 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
601 self.into()
602 }
603}
604
605impl<T> HummockVersionDeltaCommon<T>
606where
607 T: From<PbSstableInfo>,
608 PbSstableInfo: for<'a> From<&'a T>,
609{
610 pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
613 delta.into()
614 }
615}
616
617pub trait SstableIdReader {
618 fn sst_id(&self) -> HummockSstableId;
619}
620
621pub trait ObjectIdReader {
622 fn object_id(&self) -> HummockSstableObjectId;
623}
624
625impl<T> HummockVersionDeltaCommon<T>
626where
627 T: SstableIdReader + ObjectIdReader,
628{
629 pub fn newly_added_object_ids(
634 &self,
635 exclude_table_change_log: bool,
636 ) -> HashSet<HummockObjectId> {
637 match HummockObjectId::Sstable(0.into()) {
641 HummockObjectId::Sstable(_) => {}
642 HummockObjectId::VectorFile(_) => {}
643 HummockObjectId::HnswGraphFile(_) => {}
644 };
645 self.newly_added_sst_infos(exclude_table_change_log)
646 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
647 .chain(
648 self.vector_index_delta
649 .values()
650 .flat_map(|vector_index_delta| {
651 vector_index_delta
652 .newly_added_objects()
653 .map(|(object_id, _)| object_id)
654 }),
655 )
656 .collect()
657 }
658
659 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
660 self.newly_added_sst_infos(exclude_table_change_log)
661 .map(|sst| sst.sst_id())
662 .collect()
663 }
664}
665
666impl<T> HummockVersionDeltaCommon<T> {
667 pub fn newly_added_sst_infos(
668 &self,
669 exclude_table_change_log: bool,
670 ) -> impl Iterator<Item = &'_ T> {
671 let may_table_change_delta = if exclude_table_change_log {
672 None
673 } else {
674 Some(self.change_log_delta.values())
675 };
676 self.group_deltas
677 .values()
678 .flat_map(|group_deltas| {
679 group_deltas.group_deltas.iter().flat_map(|group_delta| {
680 let sst_slice = match &group_delta {
681 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
682 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
683 inserted_table_infos,
684 ..
685 }) => Some(inserted_table_infos.iter()),
686 GroupDeltaCommon::GroupConstruct(_)
687 | GroupDeltaCommon::GroupDestroy(_)
688 | GroupDeltaCommon::GroupMerge(_)
689 | GroupDeltaCommon::TruncateTables(_) => None,
690 };
691 sst_slice.into_iter().flatten()
692 })
693 })
694 .chain(
695 may_table_change_delta
696 .map(|v| {
697 v.flat_map(|delta| {
698 let new_log = &delta.new_log;
699 new_log.new_value.iter().chain(new_log.old_value.iter())
700 })
701 })
702 .into_iter()
703 .flatten(),
704 )
705 }
706}
707
708impl HummockVersionDelta {
709 #[expect(deprecated)]
710 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
711 self.max_committed_epoch
712 }
713}
714
715impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
716where
717 T: for<'a> From<&'a PbSstableInfo>,
718{
719 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
720 #[expect(deprecated)]
721 Self {
722 id: pb_version_delta.id,
723 prev_id: pb_version_delta.prev_id,
724 group_deltas: pb_version_delta
725 .group_deltas
726 .iter()
727 .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
728 .collect(),
729 max_committed_epoch: pb_version_delta.max_committed_epoch,
730 trivial_move: pb_version_delta.trivial_move,
731 new_table_watermarks: pb_version_delta
732 .new_table_watermarks
733 .iter()
734 .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
735 .collect(),
736 removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
737 change_log_delta: pb_version_delta
738 .change_log_delta
739 .iter()
740 .map(|(table_id, log_delta)| {
741 (
742 *table_id,
743 ChangeLogDeltaCommon {
744 truncate_epoch: log_delta.truncate_epoch,
745 new_log: log_delta.new_log.as_ref().unwrap().into(),
746 },
747 )
748 })
749 .collect(),
750
751 state_table_info_delta: pb_version_delta
752 .state_table_info_delta
753 .iter()
754 .map(|(table_id, delta)| (*table_id, *delta))
755 .collect(),
756 vector_index_delta: pb_version_delta
757 .vector_index_delta
758 .iter()
759 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
760 .collect(),
761 }
762 }
763}
764
765impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
766where
767 PbSstableInfo: for<'a> From<&'a T>,
768{
769 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
770 #[expect(deprecated)]
771 Self {
772 id: version_delta.id,
773 prev_id: version_delta.prev_id,
774 group_deltas: version_delta
775 .group_deltas
776 .iter()
777 .map(|(group_id, deltas)| (*group_id, deltas.into()))
778 .collect(),
779 max_committed_epoch: version_delta.max_committed_epoch,
780 trivial_move: version_delta.trivial_move,
781 new_table_watermarks: version_delta
782 .new_table_watermarks
783 .iter()
784 .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
785 .collect(),
786 removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
787 change_log_delta: version_delta
788 .change_log_delta
789 .iter()
790 .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
791 .collect(),
792 state_table_info_delta: version_delta.state_table_info_delta.clone(),
793 vector_index_delta: version_delta
794 .vector_index_delta
795 .iter()
796 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
797 .collect(),
798 }
799 }
800}
801
802impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
803where
804 PbSstableInfo: From<T>,
805{
806 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
807 #[expect(deprecated)]
808 Self {
809 id: version_delta.id,
810 prev_id: version_delta.prev_id,
811 group_deltas: version_delta
812 .group_deltas
813 .into_iter()
814 .map(|(group_id, deltas)| (group_id, deltas.into()))
815 .collect(),
816 max_committed_epoch: version_delta.max_committed_epoch,
817 trivial_move: version_delta.trivial_move,
818 new_table_watermarks: version_delta
819 .new_table_watermarks
820 .into_iter()
821 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
822 .collect(),
823 removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
824 change_log_delta: version_delta
825 .change_log_delta
826 .into_iter()
827 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
828 .collect(),
829 state_table_info_delta: version_delta.state_table_info_delta,
830 vector_index_delta: version_delta
831 .vector_index_delta
832 .into_iter()
833 .map(|(table_id, delta)| (table_id, delta.into()))
834 .collect(),
835 }
836 }
837}
838
839impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
840where
841 T: From<PbSstableInfo>,
842{
843 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
844 #[expect(deprecated)]
845 Self {
846 id: pb_version_delta.id,
847 prev_id: pb_version_delta.prev_id,
848 group_deltas: pb_version_delta
849 .group_deltas
850 .into_iter()
851 .map(|(group_id, deltas)| (group_id, deltas.into()))
852 .collect(),
853 max_committed_epoch: pb_version_delta.max_committed_epoch,
854 trivial_move: pb_version_delta.trivial_move,
855 new_table_watermarks: pb_version_delta
856 .new_table_watermarks
857 .into_iter()
858 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
859 .collect(),
860 removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
861 change_log_delta: pb_version_delta
862 .change_log_delta
863 .into_iter()
864 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
865 .collect(),
866 state_table_info_delta: pb_version_delta.state_table_info_delta,
867 vector_index_delta: pb_version_delta
868 .vector_index_delta
869 .into_iter()
870 .map(|(table_id, delta)| (table_id, delta.into()))
871 .collect(),
872 }
873 }
874}
875
876#[derive(Debug, PartialEq, Clone)]
877pub struct IntraLevelDeltaCommon<T> {
878 pub level_idx: u32,
879 pub l0_sub_level_id: u64,
880 pub removed_table_ids: HashSet<HummockSstableId>,
881 pub inserted_table_infos: Vec<T>,
882 pub vnode_partition_count: u32,
883 pub compaction_group_version_id: u64,
884}
885
886pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
887
888impl IntraLevelDelta {
889 pub fn estimated_encode_len(&self) -> usize {
890 size_of::<u32>()
891 + size_of::<u64>()
892 + self.removed_table_ids.len() * size_of::<u32>()
893 + self
894 .inserted_table_infos
895 .iter()
896 .map(|sst| sst.estimated_encode_len())
897 .sum::<usize>()
898 + size_of::<u32>()
899 }
900}
901
902impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
903where
904 T: From<PbSstableInfo>,
905{
906 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
907 Self {
908 level_idx: pb_intra_level_delta.level_idx,
909 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
910 removed_table_ids: HashSet::from_iter(
911 pb_intra_level_delta.removed_table_ids.iter().copied(),
912 ),
913 inserted_table_infos: pb_intra_level_delta
914 .inserted_table_infos
915 .into_iter()
916 .map(Into::into)
917 .collect_vec(),
918 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
919 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
920 }
921 }
922}
923
924impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
925where
926 PbSstableInfo: From<T>,
927{
928 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
929 Self {
930 level_idx: intra_level_delta.level_idx,
931 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
932 removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
933 inserted_table_infos: intra_level_delta
934 .inserted_table_infos
935 .into_iter()
936 .map(Into::into)
937 .collect_vec(),
938 vnode_partition_count: intra_level_delta.vnode_partition_count,
939 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
940 }
941 }
942}
943
944impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
945where
946 PbSstableInfo: for<'a> From<&'a T>,
947{
948 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
949 Self {
950 level_idx: intra_level_delta.level_idx,
951 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
952 removed_table_ids: intra_level_delta
953 .removed_table_ids
954 .iter()
955 .copied()
956 .collect(),
957 inserted_table_infos: intra_level_delta
958 .inserted_table_infos
959 .iter()
960 .map(Into::into)
961 .collect_vec(),
962 vnode_partition_count: intra_level_delta.vnode_partition_count,
963 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
964 }
965 }
966}
967
968impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
969where
970 T: for<'a> From<&'a PbSstableInfo>,
971{
972 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
973 Self {
974 level_idx: pb_intra_level_delta.level_idx,
975 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
976 removed_table_ids: HashSet::from_iter(
977 pb_intra_level_delta.removed_table_ids.iter().copied(),
978 ),
979 inserted_table_infos: pb_intra_level_delta
980 .inserted_table_infos
981 .iter()
982 .map(Into::into)
983 .collect_vec(),
984 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
985 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
986 }
987 }
988}
989
990impl IntraLevelDelta {
991 pub fn new(
992 level_idx: u32,
993 l0_sub_level_id: u64,
994 removed_table_ids: HashSet<HummockSstableId>,
995 inserted_table_infos: Vec<SstableInfo>,
996 vnode_partition_count: u32,
997 compaction_group_version_id: u64,
998 ) -> Self {
999 Self {
1000 level_idx,
1001 l0_sub_level_id,
1002 removed_table_ids,
1003 inserted_table_infos,
1004 vnode_partition_count,
1005 compaction_group_version_id,
1006 }
1007 }
1008}
1009
1010#[derive(Debug, PartialEq, Clone)]
1011pub enum GroupDeltaCommon<T> {
1012 NewL0SubLevel(Vec<T>),
1013 IntraLevel(IntraLevelDeltaCommon<T>),
1014 GroupConstruct(Box<PbGroupConstruct>),
1015 GroupDestroy(PbGroupDestroy),
1016 GroupMerge(PbGroupMerge),
1017 TruncateTables(HashSet<TableId>),
1018}
1019
1020pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1021
1022impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1023where
1024 T: From<PbSstableInfo>,
1025{
1026 fn from(pb_group_delta: PbGroupDelta) -> Self {
1027 match pb_group_delta.delta_type {
1028 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1029 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1030 }
1031 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1032 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1033 }
1034 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1035 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1036 }
1037 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1038 GroupDeltaCommon::GroupMerge(pb_group_merge)
1039 }
1040 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1041 pb_new_sub_level
1042 .inserted_table_infos
1043 .into_iter()
1044 .map(T::from)
1045 .collect(),
1046 ),
1047 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1048 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1049 }
1050
1051 None => panic!("delta_type is not set"),
1052 }
1053 }
1054}
1055
1056impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1057where
1058 PbSstableInfo: From<T>,
1059{
1060 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1061 match group_delta {
1062 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1063 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1064 },
1065 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1066 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1067 },
1068 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1069 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1070 },
1071 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1072 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1073 },
1074 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1075 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1076 inserted_table_infos: new_sub_level
1077 .into_iter()
1078 .map(PbSstableInfo::from)
1079 .collect(),
1080 })),
1081 },
1082 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1083 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1084 table_ids: table_ids.iter().copied().collect(),
1085 })),
1086 },
1087 }
1088 }
1089}
1090
1091impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1092where
1093 PbSstableInfo: for<'a> From<&'a T>,
1094{
1095 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1096 match group_delta {
1097 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1098 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1099 },
1100 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1101 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1102 },
1103 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1104 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1105 },
1106 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1107 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1108 },
1109 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1110 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1111 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1112 })),
1113 },
1114 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1115 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1116 table_ids: table_ids.iter().copied().collect(),
1117 })),
1118 },
1119 }
1120 }
1121}
1122
1123impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1124where
1125 T: for<'a> From<&'a PbSstableInfo>,
1126{
1127 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1128 match &pb_group_delta.delta_type {
1129 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1130 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1131 }
1132 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1133 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1134 }
1135 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1136 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1137 }
1138 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1139 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1140 }
1141 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1142 pb_new_sub_level
1143 .inserted_table_infos
1144 .iter()
1145 .map(T::from)
1146 .collect(),
1147 ),
1148 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1149 GroupDeltaCommon::TruncateTables(
1150 pb_truncate_tables.table_ids.iter().copied().collect(),
1151 )
1152 }
1153 None => panic!("delta_type is not set"),
1154 }
1155 }
1156}
1157
1158#[derive(Debug, PartialEq, Clone)]
1159pub struct GroupDeltasCommon<T> {
1160 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1161}
1162
1163impl<T> Default for GroupDeltasCommon<T> {
1164 fn default() -> Self {
1165 Self {
1166 group_deltas: vec![],
1167 }
1168 }
1169}
1170
1171pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1172
1173impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1174where
1175 T: From<PbSstableInfo>,
1176{
1177 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1178 Self {
1179 group_deltas: pb_group_deltas
1180 .group_deltas
1181 .into_iter()
1182 .map(GroupDeltaCommon::from)
1183 .collect_vec(),
1184 }
1185 }
1186}
1187
1188impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1189where
1190 PbSstableInfo: From<T>,
1191{
1192 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1193 Self {
1194 group_deltas: group_deltas
1195 .group_deltas
1196 .into_iter()
1197 .map(|group_delta| group_delta.into())
1198 .collect_vec(),
1199 }
1200 }
1201}
1202
1203impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1204where
1205 PbSstableInfo: for<'a> From<&'a T>,
1206{
1207 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1208 Self {
1209 group_deltas: group_deltas
1210 .group_deltas
1211 .iter()
1212 .map(|group_delta| group_delta.into())
1213 .collect_vec(),
1214 }
1215 }
1216}
1217
1218impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1219where
1220 T: for<'a> From<&'a PbSstableInfo>,
1221{
1222 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1223 Self {
1224 group_deltas: pb_group_deltas
1225 .group_deltas
1226 .iter()
1227 .map(GroupDeltaCommon::from)
1228 .collect_vec(),
1229 }
1230 }
1231}
1232
1233impl<T> GroupDeltasCommon<T>
1234where
1235 PbSstableInfo: for<'a> From<&'a T>,
1236{
1237 pub fn to_protobuf(&self) -> PbGroupDeltas {
1238 self.into()
1239 }
1240}
1241
1242impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1243 #[expect(deprecated)]
1244 fn from(delta: HummockVersionDelta) -> Self {
1245 Self {
1246 id: delta.id,
1247 prev_id: delta.prev_id,
1248 group_deltas: delta.group_deltas,
1249 max_committed_epoch: delta.max_committed_epoch,
1250 trivial_move: delta.trivial_move,
1251 new_table_watermarks: delta.new_table_watermarks,
1252 removed_table_ids: delta.removed_table_ids,
1253 change_log_delta: delta
1254 .change_log_delta
1255 .into_iter()
1256 .map(|(k, v)| {
1257 (
1258 k,
1259 ChangeLogDeltaCommon {
1260 truncate_epoch: v.truncate_epoch,
1261 new_log: EpochNewChangeLogCommon {
1262 new_value: Vec::new(),
1263 old_value: Vec::new(),
1264 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1265 checkpoint_epoch: v.new_log.checkpoint_epoch,
1266 },
1267 },
1268 )
1269 })
1270 .collect(),
1271 state_table_info_delta: delta.state_table_info_delta,
1272 vector_index_delta: delta.vector_index_delta,
1273 }
1274 }
1275}
1276
1277impl From<HummockVersion> for LocalHummockVersion {
1278 #[expect(deprecated)]
1279 fn from(version: HummockVersion) -> Self {
1280 Self {
1281 id: version.id,
1282 levels: version.levels,
1283 max_committed_epoch: version.max_committed_epoch,
1284 table_watermarks: version.table_watermarks,
1285 table_change_log: HashMap::default(),
1286 state_table_info: version.state_table_info,
1287 vector_indexes: version.vector_indexes,
1288 }
1289 }
1290}