1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (*table_id, *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn apply_delta(
94 &mut self,
95 delta: &HashMap<TableId, StateTableInfoDelta>,
96 removed_table_id: &HashSet<TableId>,
97 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
98 let mut changed_table = HashMap::new();
99 let mut has_bumped_committed_epoch = false;
100 fn remove_table_from_compaction_group(
101 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
102 compaction_group_id: CompactionGroupId,
103 table_id: TableId,
104 ) {
105 let member_tables = compaction_group_member_tables
106 .get_mut(&compaction_group_id)
107 .expect("should exist");
108 assert!(member_tables.remove(&table_id));
109 if member_tables.is_empty() {
110 assert!(
111 compaction_group_member_tables
112 .remove(&compaction_group_id)
113 .is_some()
114 );
115 }
116 }
117 for table_id in removed_table_id {
118 if let Some(prev_info) = self.state_table_info.remove(table_id) {
119 remove_table_from_compaction_group(
120 &mut self.compaction_group_member_tables,
121 prev_info.compaction_group_id,
122 *table_id,
123 );
124 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
125 } else {
126 warn!(
127 %table_id,
128 "table to remove does not exist"
129 );
130 }
131 }
132 for (table_id, delta) in delta {
133 if removed_table_id.contains(table_id) {
134 continue;
135 }
136 let new_info = StateTableInfo {
137 committed_epoch: delta.committed_epoch,
138 compaction_group_id: delta.compaction_group_id,
139 };
140 match self.state_table_info.entry(*table_id) {
141 Entry::Occupied(mut entry) => {
142 let prev_info = entry.get_mut();
143 assert!(
144 new_info.committed_epoch >= prev_info.committed_epoch,
145 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
146 table_id,
147 prev_info,
148 new_info
149 );
150 if new_info.committed_epoch > prev_info.committed_epoch {
151 has_bumped_committed_epoch = true;
152 }
153 if prev_info.compaction_group_id != new_info.compaction_group_id {
154 remove_table_from_compaction_group(
156 &mut self.compaction_group_member_tables,
157 prev_info.compaction_group_id,
158 *table_id,
159 );
160 assert!(
161 self.compaction_group_member_tables
162 .entry(new_info.compaction_group_id)
163 .or_default()
164 .insert(*table_id)
165 );
166 }
167 let prev_info = replace(prev_info, new_info);
168 changed_table.insert(*table_id, Some(prev_info));
169 }
170 Entry::Vacant(entry) => {
171 assert!(
172 self.compaction_group_member_tables
173 .entry(new_info.compaction_group_id)
174 .or_default()
175 .insert(*table_id)
176 );
177 has_bumped_committed_epoch = true;
178 entry.insert(new_info);
179 changed_table.insert(*table_id, None);
180 }
181 }
182 }
183 debug_assert_eq!(
184 self.compaction_group_member_tables,
185 Self::build_compaction_group_member_tables(&self.state_table_info)
186 );
187 (changed_table, has_bumped_committed_epoch)
188 }
189
190 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
191 &self.state_table_info
192 }
193
194 pub fn compaction_group_member_table_ids(
195 &self,
196 compaction_group_id: CompactionGroupId,
197 ) -> &BTreeSet<TableId> {
198 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
199 self.compaction_group_member_tables
200 .get(&compaction_group_id)
201 .unwrap_or_else(|| EMPTY_SET.deref())
202 }
203
204 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
205 &self.compaction_group_member_tables
206 }
207
208 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
209 self.state_table_info
210 .values()
211 .map(|info| info.committed_epoch)
212 .max()
213 }
214}
215
216#[derive(Debug, Clone, PartialEq)]
217pub struct HummockVersionCommon<T, L = T> {
218 pub id: HummockVersionId,
219 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
220 #[deprecated]
221 pub(crate) max_committed_epoch: u64,
222 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
223 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
224 pub state_table_info: HummockVersionStateTableInfo,
225 pub vector_indexes: HashMap<TableId, VectorIndex>,
226}
227
228pub type HummockVersion = HummockVersionCommon<SstableInfo>;
229
230pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
231
232impl Default for HummockVersion {
233 fn default() -> Self {
234 HummockVersion::from(&PbHummockVersion::default())
235 }
236}
237
238impl<T> HummockVersionCommon<T>
239where
240 T: for<'a> From<&'a PbSstableInfo>,
241 PbSstableInfo: for<'a> From<&'a T>,
242{
243 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
246 pb_version.into()
247 }
248
249 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
252 pb_version.into()
253 }
254
255 pub fn to_protobuf(&self) -> PbHummockVersion {
256 self.into()
257 }
258}
259
260impl HummockVersion {
261 pub fn estimated_encode_len(&self) -> usize {
262 self.levels.len() * size_of::<CompactionGroupId>()
263 + self
264 .levels
265 .values()
266 .map(|level| level.estimated_encode_len())
267 .sum::<usize>()
268 + self.table_watermarks.len() * size_of::<u32>()
269 + self
270 .table_watermarks
271 .values()
272 .map(|table_watermark| table_watermark.estimated_encode_len())
273 .sum::<usize>()
274 + self
275 .table_change_log
276 .values()
277 .map(|c| {
278 c.iter()
279 .map(|l| {
280 l.old_value
281 .iter()
282 .chain(l.new_value.iter())
283 .map(|s| s.estimated_encode_len())
284 .sum::<usize>()
285 })
286 .sum::<usize>()
287 })
288 .sum::<usize>()
289 }
290}
291
292impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
293where
294 T: for<'a> From<&'a PbSstableInfo>,
295{
296 fn from(pb_version: &PbHummockVersion) -> Self {
297 #[expect(deprecated)]
298 Self {
299 id: HummockVersionId(pb_version.id),
300 levels: pb_version
301 .levels
302 .iter()
303 .map(|(group_id, levels)| {
304 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
305 })
306 .collect(),
307 max_committed_epoch: pb_version.max_committed_epoch,
308 table_watermarks: pb_version
309 .table_watermarks
310 .iter()
311 .map(|(table_id, table_watermark)| {
312 (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
313 })
314 .collect(),
315 table_change_log: pb_version
316 .table_change_logs
317 .iter()
318 .map(|(table_id, change_log)| {
319 (*table_id, TableChangeLogCommon::from_protobuf(change_log))
320 })
321 .collect(),
322 state_table_info: HummockVersionStateTableInfo::from_protobuf(
323 &pb_version.state_table_info,
324 ),
325 vector_indexes: pb_version
326 .vector_indexes
327 .iter()
328 .map(|(table_id, index)| (*table_id, index.clone().into()))
329 .collect(),
330 }
331 }
332}
333
334impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
335where
336 PbSstableInfo: for<'a> From<&'a T>,
337{
338 fn from(version: &HummockVersionCommon<T>) -> Self {
339 #[expect(deprecated)]
340 Self {
341 id: version.id.0,
342 levels: version
343 .levels
344 .iter()
345 .map(|(group_id, levels)| (*group_id as _, levels.into()))
346 .collect(),
347 max_committed_epoch: version.max_committed_epoch,
348 table_watermarks: version
349 .table_watermarks
350 .iter()
351 .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
352 .collect(),
353 table_change_logs: version
354 .table_change_log
355 .iter()
356 .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
357 .collect(),
358 state_table_info: version.state_table_info.state_table_info.clone(),
359 vector_indexes: version
360 .vector_indexes
361 .iter()
362 .map(|(table_id, index)| (*table_id, index.clone().into()))
363 .collect(),
364 }
365 }
366}
367
368impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
369where
370 PbSstableInfo: From<T>,
371 PbSstableInfo: for<'a> From<&'a T>,
372{
373 fn from(version: HummockVersionCommon<T>) -> Self {
374 #[expect(deprecated)]
375 Self {
376 id: version.id.0,
377 levels: version
378 .levels
379 .into_iter()
380 .map(|(group_id, levels)| (group_id as _, levels.into()))
381 .collect(),
382 max_committed_epoch: version.max_committed_epoch,
383 table_watermarks: version
384 .table_watermarks
385 .into_iter()
386 .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
387 .collect(),
388 table_change_logs: version
389 .table_change_log
390 .into_iter()
391 .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
392 .collect(),
393 state_table_info: version.state_table_info.state_table_info.clone(),
394 vector_indexes: version
395 .vector_indexes
396 .into_iter()
397 .map(|(table_id, index)| (table_id, index.into()))
398 .collect(),
399 }
400 }
401}
402
403impl HummockVersion {
404 pub fn next_version_id(&self) -> HummockVersionId {
405 self.id.next()
406 }
407
408 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409 self.state_table_info.state_table_info.is_empty()
411 && self.levels.values().any(|group| {
412 #[expect(deprecated)]
414 !group.member_table_ids.is_empty()
415 })
416 }
417
418 pub fn may_fill_backward_compatible_state_table_info_delta(
419 &self,
420 delta: &mut HummockVersionDelta,
421 ) {
422 #[expect(deprecated)]
423 for (cg_id, group) in &self.levels {
425 for table_id in &group.member_table_ids {
426 assert!(
427 delta
428 .state_table_info_delta
429 .insert(
430 (*table_id).into(),
431 StateTableInfoDelta {
432 committed_epoch: self.max_committed_epoch,
433 compaction_group_id: *cg_id,
434 }
435 )
436 .is_none(),
437 "duplicate table id {} in cg {}",
438 table_id,
439 cg_id
440 );
441 }
442 }
443 }
444
445 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446 #[expect(deprecated)]
447 let mut init_version = HummockVersion {
448 id: FIRST_VERSION_ID,
449 levels: Default::default(),
450 max_committed_epoch: INVALID_EPOCH,
451 table_watermarks: HashMap::new(),
452 table_change_log: HashMap::new(),
453 state_table_info: HummockVersionStateTableInfo::empty(),
454 vector_indexes: Default::default(),
455 };
456 for group_id in [
457 StaticCompactionGroupId::StateDefault as CompactionGroupId,
458 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
459 ] {
460 init_version.levels.insert(
461 group_id,
462 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
463 );
464 }
465 init_version
466 }
467
468 pub fn version_delta_after(&self) -> HummockVersionDelta {
469 #[expect(deprecated)]
470 HummockVersionDelta {
471 id: self.next_version_id(),
472 prev_id: self.id,
473 trivial_move: false,
474 max_committed_epoch: self.max_committed_epoch,
475 group_deltas: Default::default(),
476 new_table_watermarks: HashMap::new(),
477 removed_table_ids: HashSet::new(),
478 change_log_delta: HashMap::new(),
479 state_table_info_delta: Default::default(),
480 vector_index_delta: Default::default(),
481 }
482 }
483
484 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
485 let table_change_log = {
486 let mut table_change_log = HashMap::new();
487 for (table_id, log) in &mut self.table_change_log {
488 let change_log_iter =
489 log.change_log_iter_mut()
490 .map(|item| EpochNewChangeLogCommon {
491 new_value: std::mem::take(&mut item.new_value),
492 old_value: std::mem::take(&mut item.old_value),
493 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
494 checkpoint_epoch: item.checkpoint_epoch,
495 });
496 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
497 }
498
499 table_change_log
500 };
501
502 let local_version = LocalHummockVersion::from(self);
503
504 (local_version, table_change_log)
505 }
506}
507
508impl<T, L> HummockVersionCommon<T, L> {
509 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
510 self.state_table_info
511 .info()
512 .get(&table_id)
513 .map(|info| info.committed_epoch)
514 }
515}
516
517#[derive(Debug, PartialEq, Clone)]
518pub struct HummockVersionDeltaCommon<T, L = T> {
519 pub id: HummockVersionId,
520 pub prev_id: HummockVersionId,
521 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
522 #[deprecated]
523 pub(crate) max_committed_epoch: u64,
524 pub trivial_move: bool,
525 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
526 pub removed_table_ids: HashSet<TableId>,
527 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
528 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
529 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
530}
531
532pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
533
534pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
535
536impl Default for HummockVersionDelta {
537 fn default() -> Self {
538 HummockVersionDelta::from(&PbHummockVersionDelta::default())
539 }
540}
541
542impl<T> HummockVersionDeltaCommon<T>
543where
544 T: for<'a> From<&'a PbSstableInfo>,
545 PbSstableInfo: for<'a> From<&'a T>,
546{
547 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
550 delta.into()
551 }
552
553 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
556 delta.into()
557 }
558
559 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
560 self.into()
561 }
562}
563
564pub trait SstableIdReader {
565 fn sst_id(&self) -> HummockSstableId;
566}
567
568pub trait ObjectIdReader {
569 fn object_id(&self) -> HummockSstableObjectId;
570}
571
572impl<T> HummockVersionDeltaCommon<T>
573where
574 T: SstableIdReader + ObjectIdReader,
575{
576 pub fn newly_added_object_ids(
581 &self,
582 exclude_table_change_log: bool,
583 ) -> HashSet<HummockObjectId> {
584 match HummockObjectId::Sstable(0.into()) {
588 HummockObjectId::Sstable(_) => {}
589 HummockObjectId::VectorFile(_) => {}
590 HummockObjectId::HnswGraphFile(_) => {}
591 };
592 self.newly_added_sst_infos(exclude_table_change_log)
593 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
594 .chain(
595 self.vector_index_delta
596 .values()
597 .flat_map(|vector_index_delta| {
598 vector_index_delta
599 .newly_added_objects()
600 .map(|(object_id, _)| object_id)
601 }),
602 )
603 .collect()
604 }
605
606 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
607 self.newly_added_sst_infos(exclude_table_change_log)
608 .map(|sst| sst.sst_id())
609 .collect()
610 }
611}
612
613impl<T> HummockVersionDeltaCommon<T> {
614 pub fn newly_added_sst_infos(
615 &self,
616 exclude_table_change_log: bool,
617 ) -> impl Iterator<Item = &'_ T> {
618 let may_table_change_delta = if exclude_table_change_log {
619 None
620 } else {
621 Some(self.change_log_delta.values())
622 };
623 self.group_deltas
624 .values()
625 .flat_map(|group_deltas| {
626 group_deltas.group_deltas.iter().flat_map(|group_delta| {
627 let sst_slice = match &group_delta {
628 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
629 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
630 inserted_table_infos,
631 ..
632 }) => Some(inserted_table_infos.iter()),
633 GroupDeltaCommon::GroupConstruct(_)
634 | GroupDeltaCommon::GroupDestroy(_)
635 | GroupDeltaCommon::GroupMerge(_)
636 | GroupDeltaCommon::TruncateTables(_) => None,
637 };
638 sst_slice.into_iter().flatten()
639 })
640 })
641 .chain(
642 may_table_change_delta
643 .map(|v| {
644 v.flat_map(|delta| {
645 let new_log = &delta.new_log;
646 new_log.new_value.iter().chain(new_log.old_value.iter())
647 })
648 })
649 .into_iter()
650 .flatten(),
651 )
652 }
653}
654
655impl HummockVersionDelta {
656 #[expect(deprecated)]
657 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
658 self.max_committed_epoch
659 }
660}
661
662impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
663where
664 T: for<'a> From<&'a PbSstableInfo>,
665{
666 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
667 #[expect(deprecated)]
668 Self {
669 id: HummockVersionId(pb_version_delta.id),
670 prev_id: HummockVersionId(pb_version_delta.prev_id),
671 group_deltas: pb_version_delta
672 .group_deltas
673 .iter()
674 .map(|(group_id, deltas)| {
675 (
676 *group_id as CompactionGroupId,
677 GroupDeltasCommon::from(deltas),
678 )
679 })
680 .collect(),
681 max_committed_epoch: pb_version_delta.max_committed_epoch,
682 trivial_move: pb_version_delta.trivial_move,
683 new_table_watermarks: pb_version_delta
684 .new_table_watermarks
685 .iter()
686 .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
687 .collect(),
688 removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
689 change_log_delta: pb_version_delta
690 .change_log_delta
691 .iter()
692 .map(|(table_id, log_delta)| {
693 (
694 *table_id,
695 ChangeLogDeltaCommon {
696 truncate_epoch: log_delta.truncate_epoch,
697 new_log: log_delta.new_log.as_ref().unwrap().into(),
698 },
699 )
700 })
701 .collect(),
702
703 state_table_info_delta: pb_version_delta
704 .state_table_info_delta
705 .iter()
706 .map(|(table_id, delta)| (*table_id, *delta))
707 .collect(),
708 vector_index_delta: pb_version_delta
709 .vector_index_delta
710 .iter()
711 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
712 .collect(),
713 }
714 }
715}
716
717impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
718where
719 PbSstableInfo: for<'a> From<&'a T>,
720{
721 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
722 #[expect(deprecated)]
723 Self {
724 id: version_delta.id.0,
725 prev_id: version_delta.prev_id.0,
726 group_deltas: version_delta
727 .group_deltas
728 .iter()
729 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
730 .collect(),
731 max_committed_epoch: version_delta.max_committed_epoch,
732 trivial_move: version_delta.trivial_move,
733 new_table_watermarks: version_delta
734 .new_table_watermarks
735 .iter()
736 .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
737 .collect(),
738 removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
739 change_log_delta: version_delta
740 .change_log_delta
741 .iter()
742 .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
743 .collect(),
744 state_table_info_delta: version_delta.state_table_info_delta.clone(),
745 vector_index_delta: version_delta
746 .vector_index_delta
747 .iter()
748 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
749 .collect(),
750 }
751 }
752}
753
754impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
755where
756 PbSstableInfo: From<T>,
757{
758 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
759 #[expect(deprecated)]
760 Self {
761 id: version_delta.id.0,
762 prev_id: version_delta.prev_id.0,
763 group_deltas: version_delta
764 .group_deltas
765 .into_iter()
766 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
767 .collect(),
768 max_committed_epoch: version_delta.max_committed_epoch,
769 trivial_move: version_delta.trivial_move,
770 new_table_watermarks: version_delta
771 .new_table_watermarks
772 .into_iter()
773 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
774 .collect(),
775 removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
776 change_log_delta: version_delta
777 .change_log_delta
778 .into_iter()
779 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
780 .collect(),
781 state_table_info_delta: version_delta.state_table_info_delta.clone(),
782 vector_index_delta: version_delta
783 .vector_index_delta
784 .into_iter()
785 .map(|(table_id, delta)| (table_id, delta.into()))
786 .collect(),
787 }
788 }
789}
790
791impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
792where
793 T: From<PbSstableInfo>,
794{
795 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
796 #[expect(deprecated)]
797 Self {
798 id: HummockVersionId(pb_version_delta.id),
799 prev_id: HummockVersionId(pb_version_delta.prev_id),
800 group_deltas: pb_version_delta
801 .group_deltas
802 .into_iter()
803 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
804 .collect(),
805 max_committed_epoch: pb_version_delta.max_committed_epoch,
806 trivial_move: pb_version_delta.trivial_move,
807 new_table_watermarks: pb_version_delta
808 .new_table_watermarks
809 .into_iter()
810 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
811 .collect(),
812 removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
813 change_log_delta: pb_version_delta
814 .change_log_delta
815 .iter()
816 .map(|(table_id, log_delta)| {
817 (
818 *table_id,
819 ChangeLogDeltaCommon {
820 new_log: log_delta.new_log.clone().unwrap().into(),
821 truncate_epoch: log_delta.truncate_epoch,
822 },
823 )
824 })
825 .collect(),
826 state_table_info_delta: pb_version_delta
827 .state_table_info_delta
828 .iter()
829 .map(|(table_id, delta)| (*table_id, *delta))
830 .collect(),
831 vector_index_delta: pb_version_delta
832 .vector_index_delta
833 .into_iter()
834 .map(|(table_id, delta)| (table_id, delta.into()))
835 .collect(),
836 }
837 }
838}
839
840#[derive(Debug, PartialEq, Clone)]
841pub struct IntraLevelDeltaCommon<T> {
842 pub level_idx: u32,
843 pub l0_sub_level_id: u64,
844 pub removed_table_ids: HashSet<HummockSstableId>,
845 pub inserted_table_infos: Vec<T>,
846 pub vnode_partition_count: u32,
847 pub compaction_group_version_id: u64,
848}
849
850pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
851
852impl IntraLevelDelta {
853 pub fn estimated_encode_len(&self) -> usize {
854 size_of::<u32>()
855 + size_of::<u64>()
856 + self.removed_table_ids.len() * size_of::<u32>()
857 + self
858 .inserted_table_infos
859 .iter()
860 .map(|sst| sst.estimated_encode_len())
861 .sum::<usize>()
862 + size_of::<u32>()
863 }
864}
865
866impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
867where
868 T: From<PbSstableInfo>,
869{
870 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
871 Self {
872 level_idx: pb_intra_level_delta.level_idx,
873 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
874 removed_table_ids: HashSet::from_iter(
875 pb_intra_level_delta
876 .removed_table_ids
877 .iter()
878 .map(|sst_id| (*sst_id).into()),
879 ),
880 inserted_table_infos: pb_intra_level_delta
881 .inserted_table_infos
882 .into_iter()
883 .map(Into::into)
884 .collect_vec(),
885 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
886 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
887 }
888 }
889}
890
891impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
892where
893 PbSstableInfo: From<T>,
894{
895 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
896 Self {
897 level_idx: intra_level_delta.level_idx,
898 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
899 removed_table_ids: intra_level_delta
900 .removed_table_ids
901 .into_iter()
902 .map(|sst_id| sst_id.inner())
903 .collect(),
904 inserted_table_infos: intra_level_delta
905 .inserted_table_infos
906 .into_iter()
907 .map(Into::into)
908 .collect_vec(),
909 vnode_partition_count: intra_level_delta.vnode_partition_count,
910 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
911 }
912 }
913}
914
915impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
916where
917 PbSstableInfo: for<'a> From<&'a T>,
918{
919 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
920 Self {
921 level_idx: intra_level_delta.level_idx,
922 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
923 removed_table_ids: intra_level_delta
924 .removed_table_ids
925 .iter()
926 .map(|sst_id| sst_id.inner())
927 .collect(),
928 inserted_table_infos: intra_level_delta
929 .inserted_table_infos
930 .iter()
931 .map(Into::into)
932 .collect_vec(),
933 vnode_partition_count: intra_level_delta.vnode_partition_count,
934 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
935 }
936 }
937}
938
939impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
940where
941 T: for<'a> From<&'a PbSstableInfo>,
942{
943 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
944 Self {
945 level_idx: pb_intra_level_delta.level_idx,
946 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
947 removed_table_ids: HashSet::from_iter(
948 pb_intra_level_delta
949 .removed_table_ids
950 .iter()
951 .map(|sst_id| (*sst_id).into()),
952 ),
953 inserted_table_infos: pb_intra_level_delta
954 .inserted_table_infos
955 .iter()
956 .map(Into::into)
957 .collect_vec(),
958 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
959 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
960 }
961 }
962}
963
964impl IntraLevelDelta {
965 pub fn new(
966 level_idx: u32,
967 l0_sub_level_id: u64,
968 removed_table_ids: HashSet<HummockSstableId>,
969 inserted_table_infos: Vec<SstableInfo>,
970 vnode_partition_count: u32,
971 compaction_group_version_id: u64,
972 ) -> Self {
973 Self {
974 level_idx,
975 l0_sub_level_id,
976 removed_table_ids,
977 inserted_table_infos,
978 vnode_partition_count,
979 compaction_group_version_id,
980 }
981 }
982}
983
984#[derive(Debug, PartialEq, Clone)]
985pub enum GroupDeltaCommon<T> {
986 NewL0SubLevel(Vec<T>),
987 IntraLevel(IntraLevelDeltaCommon<T>),
988 GroupConstruct(Box<PbGroupConstruct>),
989 GroupDestroy(PbGroupDestroy),
990 GroupMerge(PbGroupMerge),
991 TruncateTables(HashSet<TableId>),
992}
993
994pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
995
996impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
997where
998 T: From<PbSstableInfo>,
999{
1000 fn from(pb_group_delta: PbGroupDelta) -> Self {
1001 match pb_group_delta.delta_type {
1002 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1003 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1004 }
1005 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1006 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1007 }
1008 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1009 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1010 }
1011 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1012 GroupDeltaCommon::GroupMerge(pb_group_merge)
1013 }
1014 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1015 pb_new_sub_level
1016 .inserted_table_infos
1017 .into_iter()
1018 .map(T::from)
1019 .collect(),
1020 ),
1021 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1022 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1023 }
1024
1025 None => panic!("delta_type is not set"),
1026 }
1027 }
1028}
1029
1030impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1031where
1032 PbSstableInfo: From<T>,
1033{
1034 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1035 match group_delta {
1036 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1037 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1038 },
1039 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1040 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1041 },
1042 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1043 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1044 },
1045 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1046 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1047 },
1048 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1049 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1050 inserted_table_infos: new_sub_level
1051 .into_iter()
1052 .map(PbSstableInfo::from)
1053 .collect(),
1054 })),
1055 },
1056 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1057 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1058 table_ids: table_ids.iter().copied().collect(),
1059 })),
1060 },
1061 }
1062 }
1063}
1064
1065impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1066where
1067 PbSstableInfo: for<'a> From<&'a T>,
1068{
1069 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1070 match group_delta {
1071 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1072 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1073 },
1074 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1075 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1076 },
1077 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1078 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1079 },
1080 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1081 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1082 },
1083 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1084 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1085 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1086 })),
1087 },
1088 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1089 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1090 table_ids: table_ids.iter().copied().collect(),
1091 })),
1092 },
1093 }
1094 }
1095}
1096
1097impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1098where
1099 T: for<'a> From<&'a PbSstableInfo>,
1100{
1101 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1102 match &pb_group_delta.delta_type {
1103 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1104 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1105 }
1106 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1107 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1108 }
1109 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1110 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1111 }
1112 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1113 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1114 }
1115 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1116 pb_new_sub_level
1117 .inserted_table_infos
1118 .iter()
1119 .map(T::from)
1120 .collect(),
1121 ),
1122 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1123 GroupDeltaCommon::TruncateTables(
1124 pb_truncate_tables.table_ids.iter().copied().collect(),
1125 )
1126 }
1127 None => panic!("delta_type is not set"),
1128 }
1129 }
1130}
1131
1132#[derive(Debug, PartialEq, Clone)]
1133pub struct GroupDeltasCommon<T> {
1134 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1135}
1136
1137impl<T> Default for GroupDeltasCommon<T> {
1138 fn default() -> Self {
1139 Self {
1140 group_deltas: vec![],
1141 }
1142 }
1143}
1144
1145pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1146
1147impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1148where
1149 T: From<PbSstableInfo>,
1150{
1151 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1152 Self {
1153 group_deltas: pb_group_deltas
1154 .group_deltas
1155 .into_iter()
1156 .map(GroupDeltaCommon::from)
1157 .collect_vec(),
1158 }
1159 }
1160}
1161
1162impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1163where
1164 PbSstableInfo: From<T>,
1165{
1166 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1167 Self {
1168 group_deltas: group_deltas
1169 .group_deltas
1170 .into_iter()
1171 .map(|group_delta| group_delta.into())
1172 .collect_vec(),
1173 }
1174 }
1175}
1176
1177impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1178where
1179 PbSstableInfo: for<'a> From<&'a T>,
1180{
1181 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1182 Self {
1183 group_deltas: group_deltas
1184 .group_deltas
1185 .iter()
1186 .map(|group_delta| group_delta.into())
1187 .collect_vec(),
1188 }
1189 }
1190}
1191
1192impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1193where
1194 T: for<'a> From<&'a PbSstableInfo>,
1195{
1196 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1197 Self {
1198 group_deltas: pb_group_deltas
1199 .group_deltas
1200 .iter()
1201 .map(GroupDeltaCommon::from)
1202 .collect_vec(),
1203 }
1204 }
1205}
1206
1207impl<T> GroupDeltasCommon<T>
1208where
1209 PbSstableInfo: for<'a> From<&'a T>,
1210{
1211 pub fn to_protobuf(&self) -> PbGroupDeltas {
1212 self.into()
1213 }
1214}
1215
1216impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1217 #[expect(deprecated)]
1218 fn from(delta: HummockVersionDelta) -> Self {
1219 Self {
1220 id: delta.id,
1221 prev_id: delta.prev_id,
1222 group_deltas: delta.group_deltas,
1223 max_committed_epoch: delta.max_committed_epoch,
1224 trivial_move: delta.trivial_move,
1225 new_table_watermarks: delta.new_table_watermarks,
1226 removed_table_ids: delta.removed_table_ids,
1227 change_log_delta: delta
1228 .change_log_delta
1229 .into_iter()
1230 .map(|(k, v)| {
1231 (
1232 k,
1233 ChangeLogDeltaCommon {
1234 truncate_epoch: v.truncate_epoch,
1235 new_log: EpochNewChangeLogCommon {
1236 new_value: Vec::new(),
1237 old_value: Vec::new(),
1238 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1239 checkpoint_epoch: v.new_log.checkpoint_epoch,
1240 },
1241 },
1242 )
1243 })
1244 .collect(),
1245 state_table_info_delta: delta.state_table_info_delta,
1246 vector_index_delta: delta.vector_index_delta,
1247 }
1248 }
1249}
1250
1251impl From<HummockVersion> for LocalHummockVersion {
1252 #[expect(deprecated)]
1253 fn from(version: HummockVersion) -> Self {
1254 Self {
1255 id: version.id,
1256 levels: version.levels,
1257 max_committed_epoch: version.max_committed_epoch,
1258 table_watermarks: version.table_watermarks,
1259 table_change_log: version
1260 .table_change_log
1261 .into_iter()
1262 .map(|(k, v)| {
1263 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1264 .change_log_into_iter()
1265 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1266 new_value: Vec::new(),
1267 old_value: Vec::new(),
1268 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1269 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1270 })
1271 .collect();
1272 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1273 })
1274 .collect(),
1275 state_table_info: version.state_table_info,
1276 vector_indexes: version.vector_indexes,
1277 }
1278 }
1279}