1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::{
27 CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge,
28 PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo,
29 PbStateTableInfo, StateTableInfo, StateTableInfoDelta,
30};
31use tracing::warn;
32
33use crate::change_log::{
34 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
35};
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
38use crate::level::LevelsCommon;
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::TableWatermarks;
41use crate::{
42 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
43 HummockSstableObjectId, HummockVersionId,
44};
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct HummockVersionStateTableInfo {
48 state_table_info: HashMap<TableId, PbStateTableInfo>,
49
50 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
52}
53
54impl HummockVersionStateTableInfo {
55 pub fn empty() -> Self {
56 Self {
57 state_table_info: HashMap::new(),
58 compaction_group_member_tables: HashMap::new(),
59 }
60 }
61
62 fn build_compaction_group_member_tables(
63 state_table_info: &HashMap<TableId, PbStateTableInfo>,
64 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
65 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
66 for (table_id, info) in state_table_info {
67 assert!(
68 ret.entry(info.compaction_group_id)
69 .or_default()
70 .insert(*table_id)
71 );
72 }
73 ret
74 }
75
76 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
77 self.state_table_info
78 .iter()
79 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
80 .collect()
81 }
82
83 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
84 let state_table_info = state_table_info
85 .iter()
86 .map(|(table_id, info)| (TableId::new(*table_id), *info))
87 .collect();
88 let compaction_group_member_tables =
89 Self::build_compaction_group_member_tables(&state_table_info);
90 Self {
91 state_table_info,
92 compaction_group_member_tables,
93 }
94 }
95
96 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
97 self.state_table_info
98 .iter()
99 .map(|(table_id, info)| (table_id.table_id, *info))
100 .collect()
101 }
102
103 pub fn apply_delta(
104 &mut self,
105 delta: &HashMap<TableId, StateTableInfoDelta>,
106 removed_table_id: &HashSet<TableId>,
107 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
108 let mut changed_table = HashMap::new();
109 let mut has_bumped_committed_epoch = false;
110 fn remove_table_from_compaction_group(
111 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
112 compaction_group_id: CompactionGroupId,
113 table_id: TableId,
114 ) {
115 let member_tables = compaction_group_member_tables
116 .get_mut(&compaction_group_id)
117 .expect("should exist");
118 assert!(member_tables.remove(&table_id));
119 if member_tables.is_empty() {
120 assert!(
121 compaction_group_member_tables
122 .remove(&compaction_group_id)
123 .is_some()
124 );
125 }
126 }
127 for table_id in removed_table_id {
128 if let Some(prev_info) = self.state_table_info.remove(table_id) {
129 remove_table_from_compaction_group(
130 &mut self.compaction_group_member_tables,
131 prev_info.compaction_group_id,
132 *table_id,
133 );
134 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
135 } else {
136 warn!(
137 table_id = table_id.table_id,
138 "table to remove does not exist"
139 );
140 }
141 }
142 for (table_id, delta) in delta {
143 if removed_table_id.contains(table_id) {
144 continue;
145 }
146 let new_info = StateTableInfo {
147 committed_epoch: delta.committed_epoch,
148 compaction_group_id: delta.compaction_group_id,
149 };
150 match self.state_table_info.entry(*table_id) {
151 Entry::Occupied(mut entry) => {
152 let prev_info = entry.get_mut();
153 assert!(
154 new_info.committed_epoch >= prev_info.committed_epoch,
155 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
156 table_id.table_id,
157 prev_info,
158 new_info
159 );
160 if new_info.committed_epoch > prev_info.committed_epoch {
161 has_bumped_committed_epoch = true;
162 }
163 if prev_info.compaction_group_id != new_info.compaction_group_id {
164 remove_table_from_compaction_group(
166 &mut self.compaction_group_member_tables,
167 prev_info.compaction_group_id,
168 *table_id,
169 );
170 assert!(
171 self.compaction_group_member_tables
172 .entry(new_info.compaction_group_id)
173 .or_default()
174 .insert(*table_id)
175 );
176 }
177 let prev_info = replace(prev_info, new_info);
178 changed_table.insert(*table_id, Some(prev_info));
179 }
180 Entry::Vacant(entry) => {
181 assert!(
182 self.compaction_group_member_tables
183 .entry(new_info.compaction_group_id)
184 .or_default()
185 .insert(*table_id)
186 );
187 has_bumped_committed_epoch = true;
188 entry.insert(new_info);
189 changed_table.insert(*table_id, None);
190 }
191 }
192 }
193 debug_assert_eq!(
194 self.compaction_group_member_tables,
195 Self::build_compaction_group_member_tables(&self.state_table_info)
196 );
197 (changed_table, has_bumped_committed_epoch)
198 }
199
200 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
201 &self.state_table_info
202 }
203
204 pub fn compaction_group_member_table_ids(
205 &self,
206 compaction_group_id: CompactionGroupId,
207 ) -> &BTreeSet<TableId> {
208 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
209 self.compaction_group_member_tables
210 .get(&compaction_group_id)
211 .unwrap_or_else(|| EMPTY_SET.deref())
212 }
213
214 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
215 &self.compaction_group_member_tables
216 }
217
218 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
219 self.state_table_info
220 .values()
221 .map(|info| info.committed_epoch)
222 .max()
223 }
224}
225
226#[derive(Debug, Clone, PartialEq)]
227pub struct HummockVersionCommon<T, L = T> {
228 pub id: HummockVersionId,
229 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
230 #[deprecated]
231 pub(crate) max_committed_epoch: u64,
232 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
233 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
234 pub state_table_info: HummockVersionStateTableInfo,
235}
236
237pub type HummockVersion = HummockVersionCommon<SstableInfo>;
238
239pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
240
241impl Default for HummockVersion {
242 fn default() -> Self {
243 HummockVersion::from(&PbHummockVersion::default())
244 }
245}
246
247impl<T> HummockVersionCommon<T>
248where
249 T: for<'a> From<&'a PbSstableInfo>,
250 PbSstableInfo: for<'a> From<&'a T>,
251{
252 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
255 pb_version.into()
256 }
257
258 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
261 pb_version.into()
262 }
263
264 pub fn to_protobuf(&self) -> PbHummockVersion {
265 self.into()
266 }
267}
268
269impl HummockVersion {
270 pub fn estimated_encode_len(&self) -> usize {
271 self.levels.len() * size_of::<CompactionGroupId>()
272 + self
273 .levels
274 .values()
275 .map(|level| level.estimated_encode_len())
276 .sum::<usize>()
277 + self.table_watermarks.len() * size_of::<u32>()
278 + self
279 .table_watermarks
280 .values()
281 .map(|table_watermark| table_watermark.estimated_encode_len())
282 .sum::<usize>()
283 + self
284 .table_change_log
285 .values()
286 .map(|c| {
287 c.iter()
288 .map(|l| {
289 l.old_value
290 .iter()
291 .chain(l.new_value.iter())
292 .map(|s| s.estimated_encode_len())
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 })
297 .sum::<usize>()
298 }
299}
300
301impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
302where
303 T: for<'a> From<&'a PbSstableInfo>,
304{
305 fn from(pb_version: &PbHummockVersion) -> Self {
306 #[expect(deprecated)]
307 Self {
308 id: HummockVersionId(pb_version.id),
309 levels: pb_version
310 .levels
311 .iter()
312 .map(|(group_id, levels)| {
313 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
314 })
315 .collect(),
316 max_committed_epoch: pb_version.max_committed_epoch,
317 table_watermarks: pb_version
318 .table_watermarks
319 .iter()
320 .map(|(table_id, table_watermark)| {
321 (
322 TableId::new(*table_id),
323 Arc::new(TableWatermarks::from(table_watermark)),
324 )
325 })
326 .collect(),
327 table_change_log: pb_version
328 .table_change_logs
329 .iter()
330 .map(|(table_id, change_log)| {
331 (
332 TableId::new(*table_id),
333 TableChangeLogCommon::from_protobuf(change_log),
334 )
335 })
336 .collect(),
337 state_table_info: HummockVersionStateTableInfo::from_protobuf(
338 &pb_version.state_table_info,
339 ),
340 }
341 }
342}
343
344impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
345where
346 PbSstableInfo: for<'a> From<&'a T>,
347{
348 fn from(version: &HummockVersionCommon<T>) -> Self {
349 #[expect(deprecated)]
350 Self {
351 id: version.id.0,
352 levels: version
353 .levels
354 .iter()
355 .map(|(group_id, levels)| (*group_id as _, levels.into()))
356 .collect(),
357 max_committed_epoch: version.max_committed_epoch,
358 table_watermarks: version
359 .table_watermarks
360 .iter()
361 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
362 .collect(),
363 table_change_logs: version
364 .table_change_log
365 .iter()
366 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
367 .collect(),
368 state_table_info: version.state_table_info.to_protobuf(),
369 }
370 }
371}
372
373impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
374where
375 PbSstableInfo: From<T>,
376 PbSstableInfo: for<'a> From<&'a T>,
377{
378 fn from(version: HummockVersionCommon<T>) -> Self {
379 #[expect(deprecated)]
380 Self {
381 id: version.id.0,
382 levels: version
383 .levels
384 .into_iter()
385 .map(|(group_id, levels)| (group_id as _, levels.into()))
386 .collect(),
387 max_committed_epoch: version.max_committed_epoch,
388 table_watermarks: version
389 .table_watermarks
390 .into_iter()
391 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
392 .collect(),
393 table_change_logs: version
394 .table_change_log
395 .into_iter()
396 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
397 .collect(),
398 state_table_info: version.state_table_info.to_protobuf(),
399 }
400 }
401}
402
403impl HummockVersion {
404 pub fn next_version_id(&self) -> HummockVersionId {
405 self.id.next()
406 }
407
408 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409 self.state_table_info.state_table_info.is_empty()
411 && self.levels.values().any(|group| {
412 #[expect(deprecated)]
414 !group.member_table_ids.is_empty()
415 })
416 }
417
418 pub fn may_fill_backward_compatible_state_table_info_delta(
419 &self,
420 delta: &mut HummockVersionDelta,
421 ) {
422 #[expect(deprecated)]
423 for (cg_id, group) in &self.levels {
425 for table_id in &group.member_table_ids {
426 assert!(
427 delta
428 .state_table_info_delta
429 .insert(
430 TableId::new(*table_id),
431 StateTableInfoDelta {
432 committed_epoch: self.max_committed_epoch,
433 compaction_group_id: *cg_id,
434 }
435 )
436 .is_none(),
437 "duplicate table id {} in cg {}",
438 table_id,
439 cg_id
440 );
441 }
442 }
443 }
444
445 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446 #[expect(deprecated)]
447 let mut init_version = HummockVersion {
448 id: FIRST_VERSION_ID,
449 levels: Default::default(),
450 max_committed_epoch: INVALID_EPOCH,
451 table_watermarks: HashMap::new(),
452 table_change_log: HashMap::new(),
453 state_table_info: HummockVersionStateTableInfo::empty(),
454 };
455 for group_id in [
456 StaticCompactionGroupId::StateDefault as CompactionGroupId,
457 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
458 ] {
459 init_version.levels.insert(
460 group_id,
461 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
462 );
463 }
464 init_version
465 }
466
467 pub fn version_delta_after(&self) -> HummockVersionDelta {
468 #[expect(deprecated)]
469 HummockVersionDelta {
470 id: self.next_version_id(),
471 prev_id: self.id,
472 trivial_move: false,
473 max_committed_epoch: self.max_committed_epoch,
474 group_deltas: Default::default(),
475 new_table_watermarks: HashMap::new(),
476 removed_table_ids: HashSet::new(),
477 change_log_delta: HashMap::new(),
478 state_table_info_delta: Default::default(),
479 }
480 }
481
482 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
483 let table_change_log = {
484 let mut table_change_log = HashMap::new();
485 for (table_id, log) in &mut self.table_change_log {
486 let change_log_iter =
487 log.change_log_iter_mut()
488 .map(|item| EpochNewChangeLogCommon {
489 new_value: std::mem::take(&mut item.new_value),
490 old_value: std::mem::take(&mut item.old_value),
491 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
492 checkpoint_epoch: item.checkpoint_epoch,
493 });
494 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
495 }
496
497 table_change_log
498 };
499
500 let local_version = LocalHummockVersion::from(self);
501
502 (local_version, table_change_log)
503 }
504}
505
506impl<T, L> HummockVersionCommon<T, L> {
507 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
508 self.state_table_info
509 .info()
510 .get(&table_id)
511 .map(|info| info.committed_epoch)
512 }
513}
514
515#[derive(Debug, PartialEq, Clone)]
516pub struct HummockVersionDeltaCommon<T, L = T> {
517 pub id: HummockVersionId,
518 pub prev_id: HummockVersionId,
519 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
520 #[deprecated]
521 pub(crate) max_committed_epoch: u64,
522 pub trivial_move: bool,
523 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
524 pub removed_table_ids: HashSet<TableId>,
525 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
526 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
527}
528
529pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
530
531pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
532
533impl Default for HummockVersionDelta {
534 fn default() -> Self {
535 HummockVersionDelta::from(&PbHummockVersionDelta::default())
536 }
537}
538
539impl<T> HummockVersionDeltaCommon<T>
540where
541 T: for<'a> From<&'a PbSstableInfo>,
542 PbSstableInfo: for<'a> From<&'a T>,
543{
544 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
547 delta.into()
548 }
549
550 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
553 delta.into()
554 }
555
556 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
557 self.into()
558 }
559}
560
561pub trait SstableIdReader {
562 fn sst_id(&self) -> HummockSstableId;
563}
564
565pub trait ObjectIdReader {
566 fn object_id(&self) -> HummockSstableObjectId;
567}
568
569impl<T> HummockVersionDeltaCommon<T>
570where
571 T: SstableIdReader + ObjectIdReader,
572{
573 pub fn newly_added_object_ids(
578 &self,
579 exclude_table_change_log: bool,
580 ) -> HashSet<HummockObjectId> {
581 match HummockObjectId::Sstable(0.into()) {
585 HummockObjectId::Sstable(_) => {}
586 };
587 self.newly_added_sst_infos(exclude_table_change_log)
588 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
589 .collect()
590 }
591
592 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
593 self.newly_added_sst_infos(exclude_table_change_log)
594 .map(|sst| sst.sst_id())
595 .collect()
596 }
597}
598
599impl<T> HummockVersionDeltaCommon<T> {
600 pub fn newly_added_sst_infos(
601 &self,
602 exclude_table_change_log: bool,
603 ) -> impl Iterator<Item = &'_ T> {
604 let may_table_change_delta = if exclude_table_change_log {
605 None
606 } else {
607 Some(self.change_log_delta.values())
608 };
609 self.group_deltas
610 .values()
611 .flat_map(|group_deltas| {
612 group_deltas.group_deltas.iter().flat_map(|group_delta| {
613 let sst_slice = match &group_delta {
614 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
615 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
616 inserted_table_infos,
617 ..
618 }) => Some(inserted_table_infos.iter()),
619 GroupDeltaCommon::GroupConstruct(_)
620 | GroupDeltaCommon::GroupDestroy(_)
621 | GroupDeltaCommon::GroupMerge(_) => None,
622 };
623 sst_slice.into_iter().flatten()
624 })
625 })
626 .chain(
627 may_table_change_delta
628 .map(|v| {
629 v.flat_map(|delta| {
630 let new_log = &delta.new_log;
631 new_log.new_value.iter().chain(new_log.old_value.iter())
632 })
633 })
634 .into_iter()
635 .flatten(),
636 )
637 }
638}
639
640impl HummockVersionDelta {
641 #[expect(deprecated)]
642 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
643 self.max_committed_epoch
644 }
645}
646
647impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
648where
649 T: for<'a> From<&'a PbSstableInfo>,
650{
651 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
652 #[expect(deprecated)]
653 Self {
654 id: HummockVersionId(pb_version_delta.id),
655 prev_id: HummockVersionId(pb_version_delta.prev_id),
656 group_deltas: pb_version_delta
657 .group_deltas
658 .iter()
659 .map(|(group_id, deltas)| {
660 (
661 *group_id as CompactionGroupId,
662 GroupDeltasCommon::from(deltas),
663 )
664 })
665 .collect(),
666 max_committed_epoch: pb_version_delta.max_committed_epoch,
667 trivial_move: pb_version_delta.trivial_move,
668 new_table_watermarks: pb_version_delta
669 .new_table_watermarks
670 .iter()
671 .map(|(table_id, watermarks)| {
672 (TableId::new(*table_id), TableWatermarks::from(watermarks))
673 })
674 .collect(),
675 removed_table_ids: pb_version_delta
676 .removed_table_ids
677 .iter()
678 .map(|table_id| TableId::new(*table_id))
679 .collect(),
680 change_log_delta: pb_version_delta
681 .change_log_delta
682 .iter()
683 .map(|(table_id, log_delta)| {
684 (
685 TableId::new(*table_id),
686 ChangeLogDeltaCommon {
687 truncate_epoch: log_delta.truncate_epoch,
688 new_log: log_delta.new_log.as_ref().unwrap().into(),
689 },
690 )
691 })
692 .collect(),
693
694 state_table_info_delta: pb_version_delta
695 .state_table_info_delta
696 .iter()
697 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
698 .collect(),
699 }
700 }
701}
702
703impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
704where
705 PbSstableInfo: for<'a> From<&'a T>,
706{
707 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
708 #[expect(deprecated)]
709 Self {
710 id: version_delta.id.0,
711 prev_id: version_delta.prev_id.0,
712 group_deltas: version_delta
713 .group_deltas
714 .iter()
715 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
716 .collect(),
717 max_committed_epoch: version_delta.max_committed_epoch,
718 trivial_move: version_delta.trivial_move,
719 new_table_watermarks: version_delta
720 .new_table_watermarks
721 .iter()
722 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
723 .collect(),
724 removed_table_ids: version_delta
725 .removed_table_ids
726 .iter()
727 .map(|table_id| table_id.table_id)
728 .collect(),
729 change_log_delta: version_delta
730 .change_log_delta
731 .iter()
732 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
733 .collect(),
734 state_table_info_delta: version_delta
735 .state_table_info_delta
736 .iter()
737 .map(|(table_id, delta)| (table_id.table_id, *delta))
738 .collect(),
739 }
740 }
741}
742
743impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
744where
745 PbSstableInfo: From<T>,
746{
747 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
748 #[expect(deprecated)]
749 Self {
750 id: version_delta.id.0,
751 prev_id: version_delta.prev_id.0,
752 group_deltas: version_delta
753 .group_deltas
754 .into_iter()
755 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
756 .collect(),
757 max_committed_epoch: version_delta.max_committed_epoch,
758 trivial_move: version_delta.trivial_move,
759 new_table_watermarks: version_delta
760 .new_table_watermarks
761 .into_iter()
762 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
763 .collect(),
764 removed_table_ids: version_delta
765 .removed_table_ids
766 .into_iter()
767 .map(|table_id| table_id.table_id)
768 .collect(),
769 change_log_delta: version_delta
770 .change_log_delta
771 .into_iter()
772 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
773 .collect(),
774 state_table_info_delta: version_delta
775 .state_table_info_delta
776 .into_iter()
777 .map(|(table_id, delta)| (table_id.table_id, delta))
778 .collect(),
779 }
780 }
781}
782
783impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
784where
785 T: From<PbSstableInfo>,
786{
787 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
788 #[expect(deprecated)]
789 Self {
790 id: HummockVersionId(pb_version_delta.id),
791 prev_id: HummockVersionId(pb_version_delta.prev_id),
792 group_deltas: pb_version_delta
793 .group_deltas
794 .into_iter()
795 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
796 .collect(),
797 max_committed_epoch: pb_version_delta.max_committed_epoch,
798 trivial_move: pb_version_delta.trivial_move,
799 new_table_watermarks: pb_version_delta
800 .new_table_watermarks
801 .into_iter()
802 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
803 .collect(),
804 removed_table_ids: pb_version_delta
805 .removed_table_ids
806 .into_iter()
807 .map(TableId::new)
808 .collect(),
809 change_log_delta: pb_version_delta
810 .change_log_delta
811 .iter()
812 .map(|(table_id, log_delta)| {
813 (
814 TableId::new(*table_id),
815 ChangeLogDeltaCommon {
816 new_log: log_delta.new_log.clone().unwrap().into(),
817 truncate_epoch: log_delta.truncate_epoch,
818 },
819 )
820 })
821 .collect(),
822 state_table_info_delta: pb_version_delta
823 .state_table_info_delta
824 .iter()
825 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
826 .collect(),
827 }
828 }
829}
830
831#[derive(Debug, PartialEq, Clone)]
832pub struct IntraLevelDeltaCommon<T> {
833 pub level_idx: u32,
834 pub l0_sub_level_id: u64,
835 pub removed_table_ids: HashSet<HummockSstableId>,
836 pub inserted_table_infos: Vec<T>,
837 pub vnode_partition_count: u32,
838 pub compaction_group_version_id: u64,
839}
840
841pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
842
843impl IntraLevelDelta {
844 pub fn estimated_encode_len(&self) -> usize {
845 size_of::<u32>()
846 + size_of::<u64>()
847 + self.removed_table_ids.len() * size_of::<u32>()
848 + self
849 .inserted_table_infos
850 .iter()
851 .map(|sst| sst.estimated_encode_len())
852 .sum::<usize>()
853 + size_of::<u32>()
854 }
855}
856
857impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
858where
859 T: From<PbSstableInfo>,
860{
861 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
862 Self {
863 level_idx: pb_intra_level_delta.level_idx,
864 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
865 removed_table_ids: HashSet::from_iter(
866 pb_intra_level_delta
867 .removed_table_ids
868 .iter()
869 .map(|sst_id| (*sst_id).into()),
870 ),
871 inserted_table_infos: pb_intra_level_delta
872 .inserted_table_infos
873 .into_iter()
874 .map(Into::into)
875 .collect_vec(),
876 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
877 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
878 }
879 }
880}
881
882impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
883where
884 PbSstableInfo: From<T>,
885{
886 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
887 Self {
888 level_idx: intra_level_delta.level_idx,
889 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
890 removed_table_ids: intra_level_delta
891 .removed_table_ids
892 .into_iter()
893 .map(|sst_id| sst_id.inner())
894 .collect(),
895 inserted_table_infos: intra_level_delta
896 .inserted_table_infos
897 .into_iter()
898 .map(Into::into)
899 .collect_vec(),
900 vnode_partition_count: intra_level_delta.vnode_partition_count,
901 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
902 }
903 }
904}
905
906impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
907where
908 PbSstableInfo: for<'a> From<&'a T>,
909{
910 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
911 Self {
912 level_idx: intra_level_delta.level_idx,
913 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
914 removed_table_ids: intra_level_delta
915 .removed_table_ids
916 .iter()
917 .map(|sst_id| sst_id.inner())
918 .collect(),
919 inserted_table_infos: intra_level_delta
920 .inserted_table_infos
921 .iter()
922 .map(Into::into)
923 .collect_vec(),
924 vnode_partition_count: intra_level_delta.vnode_partition_count,
925 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
926 }
927 }
928}
929
930impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
931where
932 T: for<'a> From<&'a PbSstableInfo>,
933{
934 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
935 Self {
936 level_idx: pb_intra_level_delta.level_idx,
937 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
938 removed_table_ids: HashSet::from_iter(
939 pb_intra_level_delta
940 .removed_table_ids
941 .iter()
942 .map(|sst_id| (*sst_id).into()),
943 ),
944 inserted_table_infos: pb_intra_level_delta
945 .inserted_table_infos
946 .iter()
947 .map(Into::into)
948 .collect_vec(),
949 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
950 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
951 }
952 }
953}
954
955impl IntraLevelDelta {
956 pub fn new(
957 level_idx: u32,
958 l0_sub_level_id: u64,
959 removed_table_ids: HashSet<HummockSstableId>,
960 inserted_table_infos: Vec<SstableInfo>,
961 vnode_partition_count: u32,
962 compaction_group_version_id: u64,
963 ) -> Self {
964 Self {
965 level_idx,
966 l0_sub_level_id,
967 removed_table_ids,
968 inserted_table_infos,
969 vnode_partition_count,
970 compaction_group_version_id,
971 }
972 }
973}
974
975#[derive(Debug, PartialEq, Clone)]
976pub enum GroupDeltaCommon<T> {
977 NewL0SubLevel(Vec<T>),
978 IntraLevel(IntraLevelDeltaCommon<T>),
979 GroupConstruct(Box<PbGroupConstruct>),
980 GroupDestroy(PbGroupDestroy),
981 GroupMerge(PbGroupMerge),
982}
983
984pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
985
986impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
987where
988 T: From<PbSstableInfo>,
989{
990 fn from(pb_group_delta: PbGroupDelta) -> Self {
991 match pb_group_delta.delta_type {
992 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
993 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
994 }
995 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
996 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
997 }
998 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
999 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1000 }
1001 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1002 GroupDeltaCommon::GroupMerge(pb_group_merge)
1003 }
1004 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1005 pb_new_sub_level
1006 .inserted_table_infos
1007 .into_iter()
1008 .map(T::from)
1009 .collect(),
1010 ),
1011 None => panic!("delta_type is not set"),
1012 }
1013 }
1014}
1015
1016impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1017where
1018 PbSstableInfo: From<T>,
1019{
1020 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1021 match group_delta {
1022 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1023 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1024 },
1025 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1026 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1027 },
1028 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1029 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1030 },
1031 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1032 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1033 },
1034 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1035 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1036 inserted_table_infos: new_sub_level
1037 .into_iter()
1038 .map(PbSstableInfo::from)
1039 .collect(),
1040 })),
1041 },
1042 }
1043 }
1044}
1045
1046impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1047where
1048 PbSstableInfo: for<'a> From<&'a T>,
1049{
1050 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1051 match group_delta {
1052 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1053 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1054 },
1055 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1056 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1057 },
1058 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1059 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1060 },
1061 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1062 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1063 },
1064 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1065 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1066 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1067 })),
1068 },
1069 }
1070 }
1071}
1072
1073impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1074where
1075 T: for<'a> From<&'a PbSstableInfo>,
1076{
1077 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1078 match &pb_group_delta.delta_type {
1079 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1080 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1081 }
1082 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1083 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1084 }
1085 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1086 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1087 }
1088 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1089 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1090 }
1091 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1092 pb_new_sub_level
1093 .inserted_table_infos
1094 .iter()
1095 .map(T::from)
1096 .collect(),
1097 ),
1098 None => panic!("delta_type is not set"),
1099 }
1100 }
1101}
1102
1103#[derive(Debug, PartialEq, Clone)]
1104pub struct GroupDeltasCommon<T> {
1105 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1106}
1107
1108impl<T> Default for GroupDeltasCommon<T> {
1109 fn default() -> Self {
1110 Self {
1111 group_deltas: vec![],
1112 }
1113 }
1114}
1115
1116pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1117
1118impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1119where
1120 T: From<PbSstableInfo>,
1121{
1122 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1123 Self {
1124 group_deltas: pb_group_deltas
1125 .group_deltas
1126 .into_iter()
1127 .map(GroupDeltaCommon::from)
1128 .collect_vec(),
1129 }
1130 }
1131}
1132
1133impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1134where
1135 PbSstableInfo: From<T>,
1136{
1137 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1138 Self {
1139 group_deltas: group_deltas
1140 .group_deltas
1141 .into_iter()
1142 .map(|group_delta| group_delta.into())
1143 .collect_vec(),
1144 }
1145 }
1146}
1147
1148impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1149where
1150 PbSstableInfo: for<'a> From<&'a T>,
1151{
1152 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1153 Self {
1154 group_deltas: group_deltas
1155 .group_deltas
1156 .iter()
1157 .map(|group_delta| group_delta.into())
1158 .collect_vec(),
1159 }
1160 }
1161}
1162
1163impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1164where
1165 T: for<'a> From<&'a PbSstableInfo>,
1166{
1167 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1168 Self {
1169 group_deltas: pb_group_deltas
1170 .group_deltas
1171 .iter()
1172 .map(GroupDeltaCommon::from)
1173 .collect_vec(),
1174 }
1175 }
1176}
1177
1178impl<T> GroupDeltasCommon<T>
1179where
1180 PbSstableInfo: for<'a> From<&'a T>,
1181{
1182 pub fn to_protobuf(&self) -> PbGroupDeltas {
1183 self.into()
1184 }
1185}
1186
1187impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1188 #[expect(deprecated)]
1189 fn from(delta: HummockVersionDelta) -> Self {
1190 Self {
1191 id: delta.id,
1192 prev_id: delta.prev_id,
1193 group_deltas: delta.group_deltas,
1194 max_committed_epoch: delta.max_committed_epoch,
1195 trivial_move: delta.trivial_move,
1196 new_table_watermarks: delta.new_table_watermarks,
1197 removed_table_ids: delta.removed_table_ids,
1198 change_log_delta: delta
1199 .change_log_delta
1200 .into_iter()
1201 .map(|(k, v)| {
1202 (
1203 k,
1204 ChangeLogDeltaCommon {
1205 truncate_epoch: v.truncate_epoch,
1206 new_log: EpochNewChangeLogCommon {
1207 new_value: Vec::new(),
1208 old_value: Vec::new(),
1209 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1210 checkpoint_epoch: v.new_log.checkpoint_epoch,
1211 },
1212 },
1213 )
1214 })
1215 .collect(),
1216 state_table_info_delta: delta.state_table_info_delta,
1217 }
1218 }
1219}
1220
1221impl From<HummockVersion> for LocalHummockVersion {
1222 #[expect(deprecated)]
1223 fn from(version: HummockVersion) -> Self {
1224 Self {
1225 id: version.id,
1226 levels: version.levels,
1227 max_committed_epoch: version.max_committed_epoch,
1228 table_watermarks: version.table_watermarks,
1229 table_change_log: version
1230 .table_change_log
1231 .into_iter()
1232 .map(|(k, v)| {
1233 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1234 .change_log_into_iter()
1235 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1236 new_value: Vec::new(),
1237 old_value: Vec::new(),
1238 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1239 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1240 })
1241 .collect();
1242 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1243 })
1244 .collect(),
1245 state_table_info: version.state_table_info,
1246 }
1247 }
1248}