1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct HummockVersionStateTableInfo {
47 state_table_info: HashMap<TableId, PbStateTableInfo>,
48
49 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
51}
52
53impl HummockVersionStateTableInfo {
54 pub fn empty() -> Self {
55 Self {
56 state_table_info: HashMap::new(),
57 compaction_group_member_tables: HashMap::new(),
58 }
59 }
60
61 fn build_compaction_group_member_tables(
62 state_table_info: &HashMap<TableId, PbStateTableInfo>,
63 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
64 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
65 for (table_id, info) in state_table_info {
66 assert!(
67 ret.entry(info.compaction_group_id)
68 .or_default()
69 .insert(*table_id)
70 );
71 }
72 ret
73 }
74
75 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
76 self.state_table_info
77 .iter()
78 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
79 .collect()
80 }
81
82 pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
83 let state_table_info = state_table_info
84 .iter()
85 .map(|(table_id, info)| (*table_id, *info))
86 .collect();
87 let compaction_group_member_tables =
88 Self::build_compaction_group_member_tables(&state_table_info);
89 Self {
90 state_table_info,
91 compaction_group_member_tables,
92 }
93 }
94
95 pub fn from_protobuf_owned(state_table_info: HashMap<TableId, PbStateTableInfo>) -> Self {
96 let compaction_group_member_tables =
97 Self::build_compaction_group_member_tables(&state_table_info);
98 Self {
99 state_table_info,
100 compaction_group_member_tables,
101 }
102 }
103
104 pub fn apply_delta(
105 &mut self,
106 delta: &HashMap<TableId, StateTableInfoDelta>,
107 removed_table_id: &HashSet<TableId>,
108 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
109 let mut changed_table = HashMap::new();
110 let mut has_bumped_committed_epoch = false;
111 fn remove_table_from_compaction_group(
112 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
113 compaction_group_id: CompactionGroupId,
114 table_id: TableId,
115 ) {
116 let member_tables = compaction_group_member_tables
117 .get_mut(&compaction_group_id)
118 .expect("should exist");
119 assert!(member_tables.remove(&table_id));
120 if member_tables.is_empty() {
121 assert!(
122 compaction_group_member_tables
123 .remove(&compaction_group_id)
124 .is_some()
125 );
126 }
127 }
128 for table_id in removed_table_id {
129 if let Some(prev_info) = self.state_table_info.remove(table_id) {
130 remove_table_from_compaction_group(
131 &mut self.compaction_group_member_tables,
132 prev_info.compaction_group_id,
133 *table_id,
134 );
135 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
136 } else {
137 warn!(
138 %table_id,
139 "table to remove does not exist"
140 );
141 }
142 }
143 for (table_id, delta) in delta {
144 if removed_table_id.contains(table_id) {
145 continue;
146 }
147 let new_info = StateTableInfo {
148 committed_epoch: delta.committed_epoch,
149 compaction_group_id: delta.compaction_group_id,
150 };
151 match self.state_table_info.entry(*table_id) {
152 Entry::Occupied(mut entry) => {
153 let prev_info = entry.get_mut();
154 assert!(
155 new_info.committed_epoch >= prev_info.committed_epoch,
156 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
157 table_id,
158 prev_info,
159 new_info
160 );
161 if new_info.committed_epoch > prev_info.committed_epoch {
162 has_bumped_committed_epoch = true;
163 }
164 if prev_info.compaction_group_id != new_info.compaction_group_id {
165 remove_table_from_compaction_group(
167 &mut self.compaction_group_member_tables,
168 prev_info.compaction_group_id,
169 *table_id,
170 );
171 assert!(
172 self.compaction_group_member_tables
173 .entry(new_info.compaction_group_id)
174 .or_default()
175 .insert(*table_id)
176 );
177 }
178 let prev_info = replace(prev_info, new_info);
179 changed_table.insert(*table_id, Some(prev_info));
180 }
181 Entry::Vacant(entry) => {
182 assert!(
183 self.compaction_group_member_tables
184 .entry(new_info.compaction_group_id)
185 .or_default()
186 .insert(*table_id)
187 );
188 has_bumped_committed_epoch = true;
189 entry.insert(new_info);
190 changed_table.insert(*table_id, None);
191 }
192 }
193 }
194 debug_assert_eq!(
195 self.compaction_group_member_tables,
196 Self::build_compaction_group_member_tables(&self.state_table_info)
197 );
198 (changed_table, has_bumped_committed_epoch)
199 }
200
201 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
202 &self.state_table_info
203 }
204
205 pub fn compaction_group_member_table_ids(
206 &self,
207 compaction_group_id: CompactionGroupId,
208 ) -> &BTreeSet<TableId> {
209 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
210 self.compaction_group_member_tables
211 .get(&compaction_group_id)
212 .unwrap_or_else(|| EMPTY_SET.deref())
213 }
214
215 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
216 &self.compaction_group_member_tables
217 }
218
219 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
220 self.state_table_info
221 .values()
222 .map(|info| info.committed_epoch)
223 .max()
224 }
225}
226
227#[derive(Debug, Clone, PartialEq)]
228pub struct HummockVersionCommon<T, L = T> {
229 pub id: HummockVersionId,
230 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
231 #[deprecated]
232 pub(crate) max_committed_epoch: u64,
233 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
234 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
235 pub state_table_info: HummockVersionStateTableInfo,
236 pub vector_indexes: HashMap<TableId, VectorIndex>,
237}
238
239pub type HummockVersion = HummockVersionCommon<SstableInfo>;
240
241pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
242
243impl Default for HummockVersion {
244 fn default() -> Self {
245 HummockVersion::from(&PbHummockVersion::default())
246 }
247}
248
249impl<T> HummockVersionCommon<T>
250where
251 T: for<'a> From<&'a PbSstableInfo>,
252 PbSstableInfo: for<'a> From<&'a T>,
253{
254 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
257 pb_version.into()
258 }
259
260 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
263 pb_version.into()
264 }
265
266 pub fn to_protobuf(&self) -> PbHummockVersion {
267 self.into()
268 }
269}
270
271impl<T> HummockVersionCommon<T>
272where
273 T: From<PbSstableInfo>,
274 PbSstableInfo: for<'a> From<&'a T>,
275{
276 pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
279 pb_version.into()
280 }
281}
282
283impl HummockVersion {
284 pub fn estimated_encode_len(&self) -> usize {
285 self.levels.len() * size_of::<CompactionGroupId>()
286 + self
287 .levels
288 .values()
289 .map(|level| level.estimated_encode_len())
290 .sum::<usize>()
291 + self.table_watermarks.len() * size_of::<u32>()
292 + self
293 .table_watermarks
294 .values()
295 .map(|table_watermark| table_watermark.estimated_encode_len())
296 .sum::<usize>()
297 + self
298 .table_change_log
299 .values()
300 .map(|c| {
301 c.iter()
302 .map(|l| {
303 l.old_value
304 .iter()
305 .chain(l.new_value.iter())
306 .map(|s| s.estimated_encode_len())
307 .sum::<usize>()
308 })
309 .sum::<usize>()
310 })
311 .sum::<usize>()
312 }
313}
314
315impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
316where
317 T: for<'a> From<&'a PbSstableInfo>,
318{
319 fn from(pb_version: &PbHummockVersion) -> Self {
320 #[expect(deprecated)]
321 Self {
322 id: pb_version.id,
323 levels: pb_version
324 .levels
325 .iter()
326 .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
327 .collect(),
328 max_committed_epoch: pb_version.max_committed_epoch,
329 table_watermarks: pb_version
330 .table_watermarks
331 .iter()
332 .map(|(table_id, table_watermark)| {
333 (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
334 })
335 .collect(),
336 table_change_log: pb_version
337 .table_change_logs
338 .iter()
339 .map(|(table_id, change_log)| {
340 (*table_id, TableChangeLogCommon::from_protobuf(change_log))
341 })
342 .collect(),
343 state_table_info: HummockVersionStateTableInfo::from_protobuf(
344 &pb_version.state_table_info,
345 ),
346 vector_indexes: pb_version
347 .vector_indexes
348 .iter()
349 .map(|(table_id, index)| (*table_id, index.clone().into()))
350 .collect(),
351 }
352 }
353}
354
355impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
356where
357 T: From<PbSstableInfo>,
358{
359 fn from(pb_version: PbHummockVersion) -> Self {
360 #[expect(deprecated)]
361 Self {
362 id: pb_version.id,
363 levels: pb_version
364 .levels
365 .into_iter()
366 .map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
367 .collect(),
368 max_committed_epoch: pb_version.max_committed_epoch,
369 table_watermarks: pb_version
370 .table_watermarks
371 .into_iter()
372 .map(|(table_id, table_watermark)| {
373 (table_id, Arc::new(TableWatermarks::from(table_watermark)))
374 })
375 .collect(),
376 table_change_log: pb_version
377 .table_change_logs
378 .into_iter()
379 .map(|(table_id, change_log)| {
380 (
381 table_id,
382 TableChangeLogCommon::from_protobuf_owned(change_log),
383 )
384 })
385 .collect(),
386 state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
387 pb_version.state_table_info,
388 ),
389 vector_indexes: pb_version
390 .vector_indexes
391 .into_iter()
392 .map(|(table_id, index)| (table_id, index.into()))
393 .collect(),
394 }
395 }
396}
397
398impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
399where
400 PbSstableInfo: for<'a> From<&'a T>,
401{
402 fn from(version: &HummockVersionCommon<T>) -> Self {
403 #[expect(deprecated)]
404 Self {
405 id: version.id,
406 levels: version
407 .levels
408 .iter()
409 .map(|(group_id, levels)| (*group_id, levels.into()))
410 .collect(),
411 max_committed_epoch: version.max_committed_epoch,
412 table_watermarks: version
413 .table_watermarks
414 .iter()
415 .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
416 .collect(),
417 table_change_logs: version
418 .table_change_log
419 .iter()
420 .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
421 .collect(),
422 state_table_info: version.state_table_info.state_table_info.clone(),
423 vector_indexes: version
424 .vector_indexes
425 .iter()
426 .map(|(table_id, index)| (*table_id, index.clone().into()))
427 .collect(),
428 }
429 }
430}
431
432impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
433where
434 PbSstableInfo: From<T>,
435 PbSstableInfo: for<'a> From<&'a T>,
436{
437 fn from(version: HummockVersionCommon<T>) -> Self {
438 #[expect(deprecated)]
439 Self {
440 id: version.id,
441 levels: version
442 .levels
443 .into_iter()
444 .map(|(group_id, levels)| (group_id, levels.into()))
445 .collect(),
446 max_committed_epoch: version.max_committed_epoch,
447 table_watermarks: version
448 .table_watermarks
449 .into_iter()
450 .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
451 .collect(),
452 table_change_logs: version
453 .table_change_log
454 .into_iter()
455 .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
456 .collect(),
457 state_table_info: version.state_table_info.state_table_info.clone(),
458 vector_indexes: version
459 .vector_indexes
460 .into_iter()
461 .map(|(table_id, index)| (table_id, index.into()))
462 .collect(),
463 }
464 }
465}
466
467impl HummockVersion {
468 pub fn next_version_id(&self) -> HummockVersionId {
469 self.id + 1
470 }
471
472 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
473 self.state_table_info.state_table_info.is_empty()
475 && self.levels.values().any(|group| {
476 #[expect(deprecated)]
478 !group.member_table_ids.is_empty()
479 })
480 }
481
482 pub fn may_fill_backward_compatible_state_table_info_delta(
483 &self,
484 delta: &mut HummockVersionDelta,
485 ) {
486 #[expect(deprecated)]
487 for (cg_id, group) in &self.levels {
489 for table_id in &group.member_table_ids {
490 assert!(
491 delta
492 .state_table_info_delta
493 .insert(
494 (*table_id).into(),
495 StateTableInfoDelta {
496 committed_epoch: self.max_committed_epoch,
497 compaction_group_id: *cg_id,
498 }
499 )
500 .is_none(),
501 "duplicate table id {} in cg {}",
502 table_id,
503 cg_id
504 );
505 }
506 }
507 }
508
509 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
510 #[expect(deprecated)]
511 let mut init_version = HummockVersion {
512 id: FIRST_VERSION_ID,
513 levels: Default::default(),
514 max_committed_epoch: INVALID_EPOCH,
515 table_watermarks: HashMap::new(),
516 table_change_log: HashMap::new(),
517 state_table_info: HummockVersionStateTableInfo::empty(),
518 vector_indexes: Default::default(),
519 };
520 for group_id in [
521 StaticCompactionGroupId::StateDefault as CompactionGroupId,
522 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
523 ] {
524 init_version.levels.insert(
525 group_id,
526 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
527 );
528 }
529 init_version
530 }
531
532 pub fn version_delta_after(&self) -> HummockVersionDelta {
533 #[expect(deprecated)]
534 HummockVersionDelta {
535 id: self.next_version_id(),
536 prev_id: self.id,
537 trivial_move: false,
538 max_committed_epoch: self.max_committed_epoch,
539 group_deltas: Default::default(),
540 new_table_watermarks: HashMap::new(),
541 removed_table_ids: HashSet::new(),
542 change_log_delta: HashMap::new(),
543 state_table_info_delta: Default::default(),
544 vector_index_delta: Default::default(),
545 }
546 }
547
548 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
549 let table_change_log = {
550 let mut table_change_log = HashMap::new();
551 for (table_id, log) in &mut self.table_change_log {
552 let change_log_iter =
553 log.change_log_iter_mut()
554 .map(|item| EpochNewChangeLogCommon {
555 new_value: std::mem::take(&mut item.new_value),
556 old_value: std::mem::take(&mut item.old_value),
557 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
558 checkpoint_epoch: item.checkpoint_epoch,
559 });
560 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
561 }
562
563 table_change_log
564 };
565
566 let local_version = LocalHummockVersion::from(self);
567
568 (local_version, table_change_log)
569 }
570}
571
572impl<T, L> HummockVersionCommon<T, L> {
573 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
574 self.state_table_info
575 .info()
576 .get(&table_id)
577 .map(|info| info.committed_epoch)
578 }
579}
580
581#[derive(Debug, PartialEq, Clone)]
582pub struct HummockVersionDeltaCommon<T, L = T> {
583 pub id: HummockVersionId,
584 pub prev_id: HummockVersionId,
585 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
586 #[deprecated]
587 pub(crate) max_committed_epoch: u64,
588 pub trivial_move: bool,
589 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
590 pub removed_table_ids: HashSet<TableId>,
591 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
592 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
593 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
594}
595
596pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
597
598pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
599
600impl Default for HummockVersionDelta {
601 fn default() -> Self {
602 HummockVersionDelta::from(&PbHummockVersionDelta::default())
603 }
604}
605
606impl<T> HummockVersionDeltaCommon<T>
607where
608 T: for<'a> From<&'a PbSstableInfo>,
609 PbSstableInfo: for<'a> From<&'a T>,
610{
611 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
614 delta.into()
615 }
616
617 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
620 delta.into()
621 }
622
623 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
624 self.into()
625 }
626}
627
628impl<T> HummockVersionDeltaCommon<T>
629where
630 T: From<PbSstableInfo>,
631 PbSstableInfo: for<'a> From<&'a T>,
632{
633 pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
636 delta.into()
637 }
638}
639
640pub trait SstableIdReader {
641 fn sst_id(&self) -> HummockSstableId;
642}
643
644pub trait ObjectIdReader {
645 fn object_id(&self) -> HummockSstableObjectId;
646}
647
648impl<T> HummockVersionDeltaCommon<T>
649where
650 T: SstableIdReader + ObjectIdReader,
651{
652 pub fn newly_added_object_ids(
657 &self,
658 exclude_table_change_log: bool,
659 ) -> HashSet<HummockObjectId> {
660 match HummockObjectId::Sstable(0.into()) {
664 HummockObjectId::Sstable(_) => {}
665 HummockObjectId::VectorFile(_) => {}
666 HummockObjectId::HnswGraphFile(_) => {}
667 };
668 self.newly_added_sst_infos(exclude_table_change_log)
669 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
670 .chain(
671 self.vector_index_delta
672 .values()
673 .flat_map(|vector_index_delta| {
674 vector_index_delta
675 .newly_added_objects()
676 .map(|(object_id, _)| object_id)
677 }),
678 )
679 .collect()
680 }
681
682 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
683 self.newly_added_sst_infos(exclude_table_change_log)
684 .map(|sst| sst.sst_id())
685 .collect()
686 }
687}
688
689impl<T> HummockVersionDeltaCommon<T> {
690 pub fn newly_added_sst_infos(
691 &self,
692 exclude_table_change_log: bool,
693 ) -> impl Iterator<Item = &'_ T> {
694 let may_table_change_delta = if exclude_table_change_log {
695 None
696 } else {
697 Some(self.change_log_delta.values())
698 };
699 self.group_deltas
700 .values()
701 .flat_map(|group_deltas| {
702 group_deltas.group_deltas.iter().flat_map(|group_delta| {
703 let sst_slice = match &group_delta {
704 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
705 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
706 inserted_table_infos,
707 ..
708 }) => Some(inserted_table_infos.iter()),
709 GroupDeltaCommon::GroupConstruct(_)
710 | GroupDeltaCommon::GroupDestroy(_)
711 | GroupDeltaCommon::GroupMerge(_)
712 | GroupDeltaCommon::TruncateTables(_) => None,
713 };
714 sst_slice.into_iter().flatten()
715 })
716 })
717 .chain(
718 may_table_change_delta
719 .map(|v| {
720 v.flat_map(|delta| {
721 let new_log = &delta.new_log;
722 new_log.new_value.iter().chain(new_log.old_value.iter())
723 })
724 })
725 .into_iter()
726 .flatten(),
727 )
728 }
729}
730
731impl HummockVersionDelta {
732 #[expect(deprecated)]
733 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
734 self.max_committed_epoch
735 }
736}
737
738impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
739where
740 T: for<'a> From<&'a PbSstableInfo>,
741{
742 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
743 #[expect(deprecated)]
744 Self {
745 id: pb_version_delta.id,
746 prev_id: pb_version_delta.prev_id,
747 group_deltas: pb_version_delta
748 .group_deltas
749 .iter()
750 .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
751 .collect(),
752 max_committed_epoch: pb_version_delta.max_committed_epoch,
753 trivial_move: pb_version_delta.trivial_move,
754 new_table_watermarks: pb_version_delta
755 .new_table_watermarks
756 .iter()
757 .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
758 .collect(),
759 removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
760 change_log_delta: pb_version_delta
761 .change_log_delta
762 .iter()
763 .map(|(table_id, log_delta)| {
764 (
765 *table_id,
766 ChangeLogDeltaCommon {
767 truncate_epoch: log_delta.truncate_epoch,
768 new_log: log_delta.new_log.as_ref().unwrap().into(),
769 },
770 )
771 })
772 .collect(),
773
774 state_table_info_delta: pb_version_delta
775 .state_table_info_delta
776 .iter()
777 .map(|(table_id, delta)| (*table_id, *delta))
778 .collect(),
779 vector_index_delta: pb_version_delta
780 .vector_index_delta
781 .iter()
782 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
783 .collect(),
784 }
785 }
786}
787
788impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
789where
790 PbSstableInfo: for<'a> From<&'a T>,
791{
792 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
793 #[expect(deprecated)]
794 Self {
795 id: version_delta.id,
796 prev_id: version_delta.prev_id,
797 group_deltas: version_delta
798 .group_deltas
799 .iter()
800 .map(|(group_id, deltas)| (*group_id, deltas.into()))
801 .collect(),
802 max_committed_epoch: version_delta.max_committed_epoch,
803 trivial_move: version_delta.trivial_move,
804 new_table_watermarks: version_delta
805 .new_table_watermarks
806 .iter()
807 .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
808 .collect(),
809 removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
810 change_log_delta: version_delta
811 .change_log_delta
812 .iter()
813 .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
814 .collect(),
815 state_table_info_delta: version_delta.state_table_info_delta.clone(),
816 vector_index_delta: version_delta
817 .vector_index_delta
818 .iter()
819 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
820 .collect(),
821 }
822 }
823}
824
825impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
826where
827 PbSstableInfo: From<T>,
828{
829 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
830 #[expect(deprecated)]
831 Self {
832 id: version_delta.id,
833 prev_id: version_delta.prev_id,
834 group_deltas: version_delta
835 .group_deltas
836 .into_iter()
837 .map(|(group_id, deltas)| (group_id, deltas.into()))
838 .collect(),
839 max_committed_epoch: version_delta.max_committed_epoch,
840 trivial_move: version_delta.trivial_move,
841 new_table_watermarks: version_delta
842 .new_table_watermarks
843 .into_iter()
844 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
845 .collect(),
846 removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
847 change_log_delta: version_delta
848 .change_log_delta
849 .into_iter()
850 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
851 .collect(),
852 state_table_info_delta: version_delta.state_table_info_delta,
853 vector_index_delta: version_delta
854 .vector_index_delta
855 .into_iter()
856 .map(|(table_id, delta)| (table_id, delta.into()))
857 .collect(),
858 }
859 }
860}
861
862impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
863where
864 T: From<PbSstableInfo>,
865{
866 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
867 #[expect(deprecated)]
868 Self {
869 id: pb_version_delta.id,
870 prev_id: pb_version_delta.prev_id,
871 group_deltas: pb_version_delta
872 .group_deltas
873 .into_iter()
874 .map(|(group_id, deltas)| (group_id, deltas.into()))
875 .collect(),
876 max_committed_epoch: pb_version_delta.max_committed_epoch,
877 trivial_move: pb_version_delta.trivial_move,
878 new_table_watermarks: pb_version_delta
879 .new_table_watermarks
880 .into_iter()
881 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
882 .collect(),
883 removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
884 change_log_delta: pb_version_delta
885 .change_log_delta
886 .into_iter()
887 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
888 .collect(),
889 state_table_info_delta: pb_version_delta.state_table_info_delta,
890 vector_index_delta: pb_version_delta
891 .vector_index_delta
892 .into_iter()
893 .map(|(table_id, delta)| (table_id, delta.into()))
894 .collect(),
895 }
896 }
897}
898
899#[derive(Debug, PartialEq, Clone)]
900pub struct IntraLevelDeltaCommon<T> {
901 pub level_idx: u32,
902 pub l0_sub_level_id: u64,
903 pub removed_table_ids: HashSet<HummockSstableId>,
904 pub inserted_table_infos: Vec<T>,
905 pub vnode_partition_count: u32,
906 pub compaction_group_version_id: u64,
907}
908
909pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
910
911impl IntraLevelDelta {
912 pub fn estimated_encode_len(&self) -> usize {
913 size_of::<u32>()
914 + size_of::<u64>()
915 + self.removed_table_ids.len() * size_of::<u32>()
916 + self
917 .inserted_table_infos
918 .iter()
919 .map(|sst| sst.estimated_encode_len())
920 .sum::<usize>()
921 + size_of::<u32>()
922 }
923}
924
925impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
926where
927 T: From<PbSstableInfo>,
928{
929 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
930 Self {
931 level_idx: pb_intra_level_delta.level_idx,
932 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
933 removed_table_ids: HashSet::from_iter(
934 pb_intra_level_delta.removed_table_ids.iter().copied(),
935 ),
936 inserted_table_infos: pb_intra_level_delta
937 .inserted_table_infos
938 .into_iter()
939 .map(Into::into)
940 .collect_vec(),
941 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
942 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
943 }
944 }
945}
946
947impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
948where
949 PbSstableInfo: From<T>,
950{
951 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
952 Self {
953 level_idx: intra_level_delta.level_idx,
954 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
955 removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
956 inserted_table_infos: intra_level_delta
957 .inserted_table_infos
958 .into_iter()
959 .map(Into::into)
960 .collect_vec(),
961 vnode_partition_count: intra_level_delta.vnode_partition_count,
962 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
963 }
964 }
965}
966
967impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
968where
969 PbSstableInfo: for<'a> From<&'a T>,
970{
971 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
972 Self {
973 level_idx: intra_level_delta.level_idx,
974 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
975 removed_table_ids: intra_level_delta
976 .removed_table_ids
977 .iter()
978 .copied()
979 .collect(),
980 inserted_table_infos: intra_level_delta
981 .inserted_table_infos
982 .iter()
983 .map(Into::into)
984 .collect_vec(),
985 vnode_partition_count: intra_level_delta.vnode_partition_count,
986 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
987 }
988 }
989}
990
991impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
992where
993 T: for<'a> From<&'a PbSstableInfo>,
994{
995 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
996 Self {
997 level_idx: pb_intra_level_delta.level_idx,
998 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
999 removed_table_ids: HashSet::from_iter(
1000 pb_intra_level_delta.removed_table_ids.iter().copied(),
1001 ),
1002 inserted_table_infos: pb_intra_level_delta
1003 .inserted_table_infos
1004 .iter()
1005 .map(Into::into)
1006 .collect_vec(),
1007 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
1008 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
1009 }
1010 }
1011}
1012
1013impl IntraLevelDelta {
1014 pub fn new(
1015 level_idx: u32,
1016 l0_sub_level_id: u64,
1017 removed_table_ids: HashSet<HummockSstableId>,
1018 inserted_table_infos: Vec<SstableInfo>,
1019 vnode_partition_count: u32,
1020 compaction_group_version_id: u64,
1021 ) -> Self {
1022 Self {
1023 level_idx,
1024 l0_sub_level_id,
1025 removed_table_ids,
1026 inserted_table_infos,
1027 vnode_partition_count,
1028 compaction_group_version_id,
1029 }
1030 }
1031}
1032
1033#[derive(Debug, PartialEq, Clone)]
1034pub enum GroupDeltaCommon<T> {
1035 NewL0SubLevel(Vec<T>),
1036 IntraLevel(IntraLevelDeltaCommon<T>),
1037 GroupConstruct(Box<PbGroupConstruct>),
1038 GroupDestroy(PbGroupDestroy),
1039 GroupMerge(PbGroupMerge),
1040 TruncateTables(HashSet<TableId>),
1041}
1042
1043pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1044
1045impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1046where
1047 T: From<PbSstableInfo>,
1048{
1049 fn from(pb_group_delta: PbGroupDelta) -> Self {
1050 match pb_group_delta.delta_type {
1051 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1052 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1053 }
1054 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1055 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1056 }
1057 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1058 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1059 }
1060 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1061 GroupDeltaCommon::GroupMerge(pb_group_merge)
1062 }
1063 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1064 pb_new_sub_level
1065 .inserted_table_infos
1066 .into_iter()
1067 .map(T::from)
1068 .collect(),
1069 ),
1070 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1071 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1072 }
1073
1074 None => panic!("delta_type is not set"),
1075 }
1076 }
1077}
1078
1079impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1080where
1081 PbSstableInfo: From<T>,
1082{
1083 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1084 match group_delta {
1085 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1086 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1087 },
1088 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1089 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1090 },
1091 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1092 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1093 },
1094 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1095 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1096 },
1097 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1098 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1099 inserted_table_infos: new_sub_level
1100 .into_iter()
1101 .map(PbSstableInfo::from)
1102 .collect(),
1103 })),
1104 },
1105 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1106 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1107 table_ids: table_ids.iter().copied().collect(),
1108 })),
1109 },
1110 }
1111 }
1112}
1113
1114impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1115where
1116 PbSstableInfo: for<'a> From<&'a T>,
1117{
1118 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1119 match group_delta {
1120 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1121 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1122 },
1123 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1124 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1125 },
1126 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1127 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1128 },
1129 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1130 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1131 },
1132 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1133 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1134 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1135 })),
1136 },
1137 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1138 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1139 table_ids: table_ids.iter().copied().collect(),
1140 })),
1141 },
1142 }
1143 }
1144}
1145
1146impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1147where
1148 T: for<'a> From<&'a PbSstableInfo>,
1149{
1150 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1151 match &pb_group_delta.delta_type {
1152 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1153 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1154 }
1155 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1156 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1157 }
1158 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1159 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1160 }
1161 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1162 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1163 }
1164 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1165 pb_new_sub_level
1166 .inserted_table_infos
1167 .iter()
1168 .map(T::from)
1169 .collect(),
1170 ),
1171 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1172 GroupDeltaCommon::TruncateTables(
1173 pb_truncate_tables.table_ids.iter().copied().collect(),
1174 )
1175 }
1176 None => panic!("delta_type is not set"),
1177 }
1178 }
1179}
1180
1181#[derive(Debug, PartialEq, Clone)]
1182pub struct GroupDeltasCommon<T> {
1183 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1184}
1185
1186impl<T> Default for GroupDeltasCommon<T> {
1187 fn default() -> Self {
1188 Self {
1189 group_deltas: vec![],
1190 }
1191 }
1192}
1193
1194pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1195
1196impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1197where
1198 T: From<PbSstableInfo>,
1199{
1200 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1201 Self {
1202 group_deltas: pb_group_deltas
1203 .group_deltas
1204 .into_iter()
1205 .map(GroupDeltaCommon::from)
1206 .collect_vec(),
1207 }
1208 }
1209}
1210
1211impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1212where
1213 PbSstableInfo: From<T>,
1214{
1215 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1216 Self {
1217 group_deltas: group_deltas
1218 .group_deltas
1219 .into_iter()
1220 .map(|group_delta| group_delta.into())
1221 .collect_vec(),
1222 }
1223 }
1224}
1225
1226impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1227where
1228 PbSstableInfo: for<'a> From<&'a T>,
1229{
1230 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1231 Self {
1232 group_deltas: group_deltas
1233 .group_deltas
1234 .iter()
1235 .map(|group_delta| group_delta.into())
1236 .collect_vec(),
1237 }
1238 }
1239}
1240
1241impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1242where
1243 T: for<'a> From<&'a PbSstableInfo>,
1244{
1245 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1246 Self {
1247 group_deltas: pb_group_deltas
1248 .group_deltas
1249 .iter()
1250 .map(GroupDeltaCommon::from)
1251 .collect_vec(),
1252 }
1253 }
1254}
1255
1256impl<T> GroupDeltasCommon<T>
1257where
1258 PbSstableInfo: for<'a> From<&'a T>,
1259{
1260 pub fn to_protobuf(&self) -> PbGroupDeltas {
1261 self.into()
1262 }
1263}
1264
1265impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1266 #[expect(deprecated)]
1267 fn from(delta: HummockVersionDelta) -> Self {
1268 Self {
1269 id: delta.id,
1270 prev_id: delta.prev_id,
1271 group_deltas: delta.group_deltas,
1272 max_committed_epoch: delta.max_committed_epoch,
1273 trivial_move: delta.trivial_move,
1274 new_table_watermarks: delta.new_table_watermarks,
1275 removed_table_ids: delta.removed_table_ids,
1276 change_log_delta: delta
1277 .change_log_delta
1278 .into_iter()
1279 .map(|(k, v)| {
1280 (
1281 k,
1282 ChangeLogDeltaCommon {
1283 truncate_epoch: v.truncate_epoch,
1284 new_log: EpochNewChangeLogCommon {
1285 new_value: Vec::new(),
1286 old_value: Vec::new(),
1287 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1288 checkpoint_epoch: v.new_log.checkpoint_epoch,
1289 },
1290 },
1291 )
1292 })
1293 .collect(),
1294 state_table_info_delta: delta.state_table_info_delta,
1295 vector_index_delta: delta.vector_index_delta,
1296 }
1297 }
1298}
1299
1300impl From<HummockVersion> for LocalHummockVersion {
1301 #[expect(deprecated)]
1302 fn from(version: HummockVersion) -> Self {
1303 Self {
1304 id: version.id,
1305 levels: version.levels,
1306 max_committed_epoch: version.max_committed_epoch,
1307 table_watermarks: version.table_watermarks,
1308 table_change_log: version
1309 .table_change_log
1310 .into_iter()
1311 .map(|(k, v)| {
1312 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1313 .change_log_into_iter()
1314 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1315 new_value: Vec::new(),
1316 old_value: Vec::new(),
1317 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1318 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1319 })
1320 .collect();
1321 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1322 })
1323 .collect(),
1324 state_table_info: version.state_table_info,
1325 vector_indexes: version.vector_indexes,
1326 }
1327 }
1328}