1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::{
27 CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge,
28 PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo,
29 PbStateTableInfo, StateTableInfo, StateTableInfoDelta,
30};
31use tracing::warn;
32
33use crate::change_log::{
34 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
35};
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
38use crate::level::LevelsCommon;
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::TableWatermarks;
41use crate::{
42 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockSstableId, HummockSstableObjectId,
43 HummockVersionId,
44};
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct HummockVersionStateTableInfo {
48 state_table_info: HashMap<TableId, PbStateTableInfo>,
49
50 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
52}
53
54impl HummockVersionStateTableInfo {
55 pub fn empty() -> Self {
56 Self {
57 state_table_info: HashMap::new(),
58 compaction_group_member_tables: HashMap::new(),
59 }
60 }
61
62 fn build_compaction_group_member_tables(
63 state_table_info: &HashMap<TableId, PbStateTableInfo>,
64 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
65 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
66 for (table_id, info) in state_table_info {
67 assert!(
68 ret.entry(info.compaction_group_id)
69 .or_default()
70 .insert(*table_id)
71 );
72 }
73 ret
74 }
75
76 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
77 self.state_table_info
78 .iter()
79 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
80 .collect()
81 }
82
83 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
84 let state_table_info = state_table_info
85 .iter()
86 .map(|(table_id, info)| (TableId::new(*table_id), *info))
87 .collect();
88 let compaction_group_member_tables =
89 Self::build_compaction_group_member_tables(&state_table_info);
90 Self {
91 state_table_info,
92 compaction_group_member_tables,
93 }
94 }
95
96 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
97 self.state_table_info
98 .iter()
99 .map(|(table_id, info)| (table_id.table_id, *info))
100 .collect()
101 }
102
103 pub fn apply_delta(
104 &mut self,
105 delta: &HashMap<TableId, StateTableInfoDelta>,
106 removed_table_id: &HashSet<TableId>,
107 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
108 let mut changed_table = HashMap::new();
109 let mut has_bumped_committed_epoch = false;
110 fn remove_table_from_compaction_group(
111 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
112 compaction_group_id: CompactionGroupId,
113 table_id: TableId,
114 ) {
115 let member_tables = compaction_group_member_tables
116 .get_mut(&compaction_group_id)
117 .expect("should exist");
118 assert!(member_tables.remove(&table_id));
119 if member_tables.is_empty() {
120 assert!(
121 compaction_group_member_tables
122 .remove(&compaction_group_id)
123 .is_some()
124 );
125 }
126 }
127 for table_id in removed_table_id {
128 if let Some(prev_info) = self.state_table_info.remove(table_id) {
129 remove_table_from_compaction_group(
130 &mut self.compaction_group_member_tables,
131 prev_info.compaction_group_id,
132 *table_id,
133 );
134 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
135 } else {
136 warn!(
137 table_id = table_id.table_id,
138 "table to remove does not exist"
139 );
140 }
141 }
142 for (table_id, delta) in delta {
143 if removed_table_id.contains(table_id) {
144 continue;
145 }
146 let new_info = StateTableInfo {
147 committed_epoch: delta.committed_epoch,
148 compaction_group_id: delta.compaction_group_id,
149 };
150 match self.state_table_info.entry(*table_id) {
151 Entry::Occupied(mut entry) => {
152 let prev_info = entry.get_mut();
153 assert!(
154 new_info.committed_epoch >= prev_info.committed_epoch,
155 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
156 table_id.table_id,
157 prev_info,
158 new_info
159 );
160 if new_info.committed_epoch > prev_info.committed_epoch {
161 has_bumped_committed_epoch = true;
162 }
163 if prev_info.compaction_group_id != new_info.compaction_group_id {
164 remove_table_from_compaction_group(
166 &mut self.compaction_group_member_tables,
167 prev_info.compaction_group_id,
168 *table_id,
169 );
170 assert!(
171 self.compaction_group_member_tables
172 .entry(new_info.compaction_group_id)
173 .or_default()
174 .insert(*table_id)
175 );
176 }
177 let prev_info = replace(prev_info, new_info);
178 changed_table.insert(*table_id, Some(prev_info));
179 }
180 Entry::Vacant(entry) => {
181 assert!(
182 self.compaction_group_member_tables
183 .entry(new_info.compaction_group_id)
184 .or_default()
185 .insert(*table_id)
186 );
187 has_bumped_committed_epoch = true;
188 entry.insert(new_info);
189 changed_table.insert(*table_id, None);
190 }
191 }
192 }
193 debug_assert_eq!(
194 self.compaction_group_member_tables,
195 Self::build_compaction_group_member_tables(&self.state_table_info)
196 );
197 (changed_table, has_bumped_committed_epoch)
198 }
199
200 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
201 &self.state_table_info
202 }
203
204 pub fn compaction_group_member_table_ids(
205 &self,
206 compaction_group_id: CompactionGroupId,
207 ) -> &BTreeSet<TableId> {
208 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
209 self.compaction_group_member_tables
210 .get(&compaction_group_id)
211 .unwrap_or_else(|| EMPTY_SET.deref())
212 }
213
214 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
215 &self.compaction_group_member_tables
216 }
217
218 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
219 self.state_table_info
220 .values()
221 .map(|info| info.committed_epoch)
222 .max()
223 }
224}
225
226#[derive(Debug, Clone, PartialEq)]
227pub struct HummockVersionCommon<T, L = T> {
228 pub id: HummockVersionId,
229 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
230 #[deprecated]
231 pub(crate) max_committed_epoch: u64,
232 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
233 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
234 pub state_table_info: HummockVersionStateTableInfo,
235}
236
237pub type HummockVersion = HummockVersionCommon<SstableInfo>;
238
239pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
240
241impl Default for HummockVersion {
242 fn default() -> Self {
243 HummockVersion::from(&PbHummockVersion::default())
244 }
245}
246
247impl<T> HummockVersionCommon<T>
248where
249 T: for<'a> From<&'a PbSstableInfo>,
250 PbSstableInfo: for<'a> From<&'a T>,
251{
252 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
255 pb_version.into()
256 }
257
258 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
261 pb_version.into()
262 }
263
264 pub fn to_protobuf(&self) -> PbHummockVersion {
265 self.into()
266 }
267}
268
269impl HummockVersion {
270 pub fn estimated_encode_len(&self) -> usize {
271 self.levels.len() * size_of::<CompactionGroupId>()
272 + self
273 .levels
274 .values()
275 .map(|level| level.estimated_encode_len())
276 .sum::<usize>()
277 + self.table_watermarks.len() * size_of::<u32>()
278 + self
279 .table_watermarks
280 .values()
281 .map(|table_watermark| table_watermark.estimated_encode_len())
282 .sum::<usize>()
283 + self
284 .table_change_log
285 .values()
286 .map(|c| {
287 c.iter()
288 .map(|l| {
289 l.old_value
290 .iter()
291 .chain(l.new_value.iter())
292 .map(|s| s.estimated_encode_len())
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 })
297 .sum::<usize>()
298 }
299}
300
301impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
302where
303 T: for<'a> From<&'a PbSstableInfo>,
304{
305 fn from(pb_version: &PbHummockVersion) -> Self {
306 #[expect(deprecated)]
307 Self {
308 id: HummockVersionId(pb_version.id),
309 levels: pb_version
310 .levels
311 .iter()
312 .map(|(group_id, levels)| {
313 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
314 })
315 .collect(),
316 max_committed_epoch: pb_version.max_committed_epoch,
317 table_watermarks: pb_version
318 .table_watermarks
319 .iter()
320 .map(|(table_id, table_watermark)| {
321 (
322 TableId::new(*table_id),
323 Arc::new(TableWatermarks::from(table_watermark)),
324 )
325 })
326 .collect(),
327 table_change_log: pb_version
328 .table_change_logs
329 .iter()
330 .map(|(table_id, change_log)| {
331 (
332 TableId::new(*table_id),
333 TableChangeLogCommon::from_protobuf(change_log),
334 )
335 })
336 .collect(),
337 state_table_info: HummockVersionStateTableInfo::from_protobuf(
338 &pb_version.state_table_info,
339 ),
340 }
341 }
342}
343
344impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
345where
346 PbSstableInfo: for<'a> From<&'a T>,
347{
348 fn from(version: &HummockVersionCommon<T>) -> Self {
349 #[expect(deprecated)]
350 Self {
351 id: version.id.0,
352 levels: version
353 .levels
354 .iter()
355 .map(|(group_id, levels)| (*group_id as _, levels.into()))
356 .collect(),
357 max_committed_epoch: version.max_committed_epoch,
358 table_watermarks: version
359 .table_watermarks
360 .iter()
361 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
362 .collect(),
363 table_change_logs: version
364 .table_change_log
365 .iter()
366 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
367 .collect(),
368 state_table_info: version.state_table_info.to_protobuf(),
369 }
370 }
371}
372
373impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
374where
375 PbSstableInfo: From<T>,
376 PbSstableInfo: for<'a> From<&'a T>,
377{
378 fn from(version: HummockVersionCommon<T>) -> Self {
379 #[expect(deprecated)]
380 Self {
381 id: version.id.0,
382 levels: version
383 .levels
384 .into_iter()
385 .map(|(group_id, levels)| (group_id as _, levels.into()))
386 .collect(),
387 max_committed_epoch: version.max_committed_epoch,
388 table_watermarks: version
389 .table_watermarks
390 .into_iter()
391 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
392 .collect(),
393 table_change_logs: version
394 .table_change_log
395 .into_iter()
396 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
397 .collect(),
398 state_table_info: version.state_table_info.to_protobuf(),
399 }
400 }
401}
402
403impl HummockVersion {
404 pub fn next_version_id(&self) -> HummockVersionId {
405 self.id.next()
406 }
407
408 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409 self.state_table_info.state_table_info.is_empty()
411 && self.levels.values().any(|group| {
412 #[expect(deprecated)]
414 !group.member_table_ids.is_empty()
415 })
416 }
417
418 pub fn may_fill_backward_compatible_state_table_info_delta(
419 &self,
420 delta: &mut HummockVersionDelta,
421 ) {
422 #[expect(deprecated)]
423 for (cg_id, group) in &self.levels {
425 for table_id in &group.member_table_ids {
426 assert!(
427 delta
428 .state_table_info_delta
429 .insert(
430 TableId::new(*table_id),
431 StateTableInfoDelta {
432 committed_epoch: self.max_committed_epoch,
433 compaction_group_id: *cg_id,
434 }
435 )
436 .is_none(),
437 "duplicate table id {} in cg {}",
438 table_id,
439 cg_id
440 );
441 }
442 }
443 }
444
445 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446 #[expect(deprecated)]
447 let mut init_version = HummockVersion {
448 id: FIRST_VERSION_ID,
449 levels: Default::default(),
450 max_committed_epoch: INVALID_EPOCH,
451 table_watermarks: HashMap::new(),
452 table_change_log: HashMap::new(),
453 state_table_info: HummockVersionStateTableInfo::empty(),
454 };
455 for group_id in [
456 StaticCompactionGroupId::StateDefault as CompactionGroupId,
457 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
458 ] {
459 init_version.levels.insert(
460 group_id,
461 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
462 );
463 }
464 init_version
465 }
466
467 pub fn version_delta_after(&self) -> HummockVersionDelta {
468 #[expect(deprecated)]
469 HummockVersionDelta {
470 id: self.next_version_id(),
471 prev_id: self.id,
472 trivial_move: false,
473 max_committed_epoch: self.max_committed_epoch,
474 group_deltas: Default::default(),
475 new_table_watermarks: HashMap::new(),
476 removed_table_ids: HashSet::new(),
477 change_log_delta: HashMap::new(),
478 state_table_info_delta: Default::default(),
479 }
480 }
481
482 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
483 let table_change_log = {
484 let mut table_change_log = HashMap::new();
485 for (table_id, log) in &mut self.table_change_log {
486 let change_log_iter =
487 log.change_log_iter_mut()
488 .map(|item| EpochNewChangeLogCommon {
489 new_value: std::mem::take(&mut item.new_value),
490 old_value: std::mem::take(&mut item.old_value),
491 epochs: item.epochs.clone(),
492 });
493 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
494 }
495
496 table_change_log
497 };
498
499 let local_version = LocalHummockVersion::from(self);
500
501 (local_version, table_change_log)
502 }
503}
504
505impl<T, L> HummockVersionCommon<T, L> {
506 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
507 self.state_table_info
508 .info()
509 .get(&table_id)
510 .map(|info| info.committed_epoch)
511 }
512}
513
514#[derive(Debug, PartialEq, Clone)]
515pub struct HummockVersionDeltaCommon<T, L = T> {
516 pub id: HummockVersionId,
517 pub prev_id: HummockVersionId,
518 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
519 #[deprecated]
520 pub(crate) max_committed_epoch: u64,
521 pub trivial_move: bool,
522 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
523 pub removed_table_ids: HashSet<TableId>,
524 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
525 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
526}
527
528pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
529
530pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
531
532impl Default for HummockVersionDelta {
533 fn default() -> Self {
534 HummockVersionDelta::from(&PbHummockVersionDelta::default())
535 }
536}
537
538impl<T> HummockVersionDeltaCommon<T>
539where
540 T: for<'a> From<&'a PbSstableInfo>,
541 PbSstableInfo: for<'a> From<&'a T>,
542{
543 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
546 delta.into()
547 }
548
549 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
552 delta.into()
553 }
554
555 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
556 self.into()
557 }
558}
559
560pub trait SstableIdReader {
561 fn sst_id(&self) -> HummockSstableId;
562}
563
564pub trait ObjectIdReader {
565 fn object_id(&self) -> HummockSstableObjectId;
566}
567
568impl<T> HummockVersionDeltaCommon<T>
569where
570 T: SstableIdReader + ObjectIdReader,
571{
572 pub fn newly_added_object_ids(
577 &self,
578 exclude_table_change_log: bool,
579 ) -> HashSet<HummockSstableObjectId> {
580 self.newly_added_sst_infos(exclude_table_change_log)
581 .map(|sst| sst.object_id())
582 .collect()
583 }
584
585 pub fn newly_added_sst_ids(
586 &self,
587 exclude_table_change_log: bool,
588 ) -> HashSet<HummockSstableObjectId> {
589 self.newly_added_sst_infos(exclude_table_change_log)
590 .map(|sst| sst.sst_id())
591 .collect()
592 }
593
594 pub fn newly_added_sst_infos(
595 &self,
596 exclude_table_change_log: bool,
597 ) -> impl Iterator<Item = &'_ T> {
598 let may_table_change_delta = if exclude_table_change_log {
599 None
600 } else {
601 Some(self.change_log_delta.values())
602 };
603 self.group_deltas
604 .values()
605 .flat_map(|group_deltas| {
606 group_deltas.group_deltas.iter().flat_map(|group_delta| {
607 let sst_slice = match &group_delta {
608 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
609 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
610 inserted_table_infos,
611 ..
612 }) => Some(inserted_table_infos.iter()),
613 GroupDeltaCommon::GroupConstruct(_)
614 | GroupDeltaCommon::GroupDestroy(_)
615 | GroupDeltaCommon::GroupMerge(_) => None,
616 };
617 sst_slice.into_iter().flatten()
618 })
619 })
620 .chain(
621 may_table_change_delta
622 .map(|v| {
623 v.flat_map(|delta| {
624 let new_log = &delta.new_log;
625 new_log.new_value.iter().chain(new_log.old_value.iter())
626 })
627 })
628 .into_iter()
629 .flatten(),
630 )
631 }
632}
633
634impl HummockVersionDelta {
635 #[expect(deprecated)]
636 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
637 self.max_committed_epoch
638 }
639}
640
641impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
642where
643 T: for<'a> From<&'a PbSstableInfo>,
644{
645 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
646 #[expect(deprecated)]
647 Self {
648 id: HummockVersionId(pb_version_delta.id),
649 prev_id: HummockVersionId(pb_version_delta.prev_id),
650 group_deltas: pb_version_delta
651 .group_deltas
652 .iter()
653 .map(|(group_id, deltas)| {
654 (
655 *group_id as CompactionGroupId,
656 GroupDeltasCommon::from(deltas),
657 )
658 })
659 .collect(),
660 max_committed_epoch: pb_version_delta.max_committed_epoch,
661 trivial_move: pb_version_delta.trivial_move,
662 new_table_watermarks: pb_version_delta
663 .new_table_watermarks
664 .iter()
665 .map(|(table_id, watermarks)| {
666 (TableId::new(*table_id), TableWatermarks::from(watermarks))
667 })
668 .collect(),
669 removed_table_ids: pb_version_delta
670 .removed_table_ids
671 .iter()
672 .map(|table_id| TableId::new(*table_id))
673 .collect(),
674 change_log_delta: pb_version_delta
675 .change_log_delta
676 .iter()
677 .map(|(table_id, log_delta)| {
678 (
679 TableId::new(*table_id),
680 ChangeLogDeltaCommon {
681 truncate_epoch: log_delta.truncate_epoch,
682 new_log: log_delta.new_log.as_ref().unwrap().into(),
683 },
684 )
685 })
686 .collect(),
687
688 state_table_info_delta: pb_version_delta
689 .state_table_info_delta
690 .iter()
691 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
692 .collect(),
693 }
694 }
695}
696
697impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
698where
699 PbSstableInfo: for<'a> From<&'a T>,
700{
701 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
702 #[expect(deprecated)]
703 Self {
704 id: version_delta.id.0,
705 prev_id: version_delta.prev_id.0,
706 group_deltas: version_delta
707 .group_deltas
708 .iter()
709 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
710 .collect(),
711 max_committed_epoch: version_delta.max_committed_epoch,
712 trivial_move: version_delta.trivial_move,
713 new_table_watermarks: version_delta
714 .new_table_watermarks
715 .iter()
716 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
717 .collect(),
718 removed_table_ids: version_delta
719 .removed_table_ids
720 .iter()
721 .map(|table_id| table_id.table_id)
722 .collect(),
723 change_log_delta: version_delta
724 .change_log_delta
725 .iter()
726 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
727 .collect(),
728 state_table_info_delta: version_delta
729 .state_table_info_delta
730 .iter()
731 .map(|(table_id, delta)| (table_id.table_id, *delta))
732 .collect(),
733 }
734 }
735}
736
737impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
738where
739 PbSstableInfo: From<T>,
740{
741 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
742 #[expect(deprecated)]
743 Self {
744 id: version_delta.id.0,
745 prev_id: version_delta.prev_id.0,
746 group_deltas: version_delta
747 .group_deltas
748 .into_iter()
749 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
750 .collect(),
751 max_committed_epoch: version_delta.max_committed_epoch,
752 trivial_move: version_delta.trivial_move,
753 new_table_watermarks: version_delta
754 .new_table_watermarks
755 .into_iter()
756 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
757 .collect(),
758 removed_table_ids: version_delta
759 .removed_table_ids
760 .into_iter()
761 .map(|table_id| table_id.table_id)
762 .collect(),
763 change_log_delta: version_delta
764 .change_log_delta
765 .into_iter()
766 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
767 .collect(),
768 state_table_info_delta: version_delta
769 .state_table_info_delta
770 .into_iter()
771 .map(|(table_id, delta)| (table_id.table_id, delta))
772 .collect(),
773 }
774 }
775}
776
777impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
778where
779 T: From<PbSstableInfo>,
780{
781 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
782 #[expect(deprecated)]
783 Self {
784 id: HummockVersionId(pb_version_delta.id),
785 prev_id: HummockVersionId(pb_version_delta.prev_id),
786 group_deltas: pb_version_delta
787 .group_deltas
788 .into_iter()
789 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
790 .collect(),
791 max_committed_epoch: pb_version_delta.max_committed_epoch,
792 trivial_move: pb_version_delta.trivial_move,
793 new_table_watermarks: pb_version_delta
794 .new_table_watermarks
795 .into_iter()
796 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
797 .collect(),
798 removed_table_ids: pb_version_delta
799 .removed_table_ids
800 .into_iter()
801 .map(TableId::new)
802 .collect(),
803 change_log_delta: pb_version_delta
804 .change_log_delta
805 .iter()
806 .map(|(table_id, log_delta)| {
807 (
808 TableId::new(*table_id),
809 ChangeLogDeltaCommon {
810 new_log: log_delta.new_log.clone().unwrap().into(),
811 truncate_epoch: log_delta.truncate_epoch,
812 },
813 )
814 })
815 .collect(),
816 state_table_info_delta: pb_version_delta
817 .state_table_info_delta
818 .iter()
819 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
820 .collect(),
821 }
822 }
823}
824
825#[derive(Debug, PartialEq, Clone)]
826pub struct IntraLevelDeltaCommon<T> {
827 pub level_idx: u32,
828 pub l0_sub_level_id: u64,
829 pub removed_table_ids: HashSet<u64>,
830 pub inserted_table_infos: Vec<T>,
831 pub vnode_partition_count: u32,
832 pub compaction_group_version_id: u64,
833}
834
835pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
836
837impl IntraLevelDelta {
838 pub fn estimated_encode_len(&self) -> usize {
839 size_of::<u32>()
840 + size_of::<u64>()
841 + self.removed_table_ids.len() * size_of::<u32>()
842 + self
843 .inserted_table_infos
844 .iter()
845 .map(|sst| sst.estimated_encode_len())
846 .sum::<usize>()
847 + size_of::<u32>()
848 }
849}
850
851impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
852where
853 T: From<PbSstableInfo>,
854{
855 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
856 Self {
857 level_idx: pb_intra_level_delta.level_idx,
858 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
859 removed_table_ids: HashSet::from_iter(pb_intra_level_delta.removed_table_ids),
860 inserted_table_infos: pb_intra_level_delta
861 .inserted_table_infos
862 .into_iter()
863 .map(Into::into)
864 .collect_vec(),
865 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
866 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
867 }
868 }
869}
870
871impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
872where
873 PbSstableInfo: From<T>,
874{
875 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
876 Self {
877 level_idx: intra_level_delta.level_idx,
878 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
879 removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
880 inserted_table_infos: intra_level_delta
881 .inserted_table_infos
882 .into_iter()
883 .map(Into::into)
884 .collect_vec(),
885 vnode_partition_count: intra_level_delta.vnode_partition_count,
886 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
887 }
888 }
889}
890
891impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
892where
893 PbSstableInfo: for<'a> From<&'a T>,
894{
895 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
896 Self {
897 level_idx: intra_level_delta.level_idx,
898 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
899 removed_table_ids: intra_level_delta
900 .removed_table_ids
901 .iter()
902 .cloned()
903 .collect(),
904 inserted_table_infos: intra_level_delta
905 .inserted_table_infos
906 .iter()
907 .map(Into::into)
908 .collect_vec(),
909 vnode_partition_count: intra_level_delta.vnode_partition_count,
910 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
911 }
912 }
913}
914
915impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
916where
917 T: for<'a> From<&'a PbSstableInfo>,
918{
919 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
920 Self {
921 level_idx: pb_intra_level_delta.level_idx,
922 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
923 removed_table_ids: HashSet::from_iter(
924 pb_intra_level_delta.removed_table_ids.iter().cloned(),
925 ),
926 inserted_table_infos: pb_intra_level_delta
927 .inserted_table_infos
928 .iter()
929 .map(Into::into)
930 .collect_vec(),
931 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
932 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
933 }
934 }
935}
936
937impl IntraLevelDelta {
938 pub fn new(
939 level_idx: u32,
940 l0_sub_level_id: u64,
941 removed_table_ids: HashSet<u64>,
942 inserted_table_infos: Vec<SstableInfo>,
943 vnode_partition_count: u32,
944 compaction_group_version_id: u64,
945 ) -> Self {
946 Self {
947 level_idx,
948 l0_sub_level_id,
949 removed_table_ids,
950 inserted_table_infos,
951 vnode_partition_count,
952 compaction_group_version_id,
953 }
954 }
955}
956
957#[derive(Debug, PartialEq, Clone)]
958pub enum GroupDeltaCommon<T> {
959 NewL0SubLevel(Vec<T>),
960 IntraLevel(IntraLevelDeltaCommon<T>),
961 GroupConstruct(Box<PbGroupConstruct>),
962 GroupDestroy(PbGroupDestroy),
963 GroupMerge(PbGroupMerge),
964}
965
966pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
967
968impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
969where
970 T: From<PbSstableInfo>,
971{
972 fn from(pb_group_delta: PbGroupDelta) -> Self {
973 match pb_group_delta.delta_type {
974 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
975 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
976 }
977 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
978 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
979 }
980 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
981 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
982 }
983 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
984 GroupDeltaCommon::GroupMerge(pb_group_merge)
985 }
986 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
987 pb_new_sub_level
988 .inserted_table_infos
989 .into_iter()
990 .map(T::from)
991 .collect(),
992 ),
993 None => panic!("delta_type is not set"),
994 }
995 }
996}
997
998impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
999where
1000 PbSstableInfo: From<T>,
1001{
1002 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1003 match group_delta {
1004 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1005 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1006 },
1007 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1008 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1009 },
1010 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1011 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1012 },
1013 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1014 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1015 },
1016 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1017 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1018 inserted_table_infos: new_sub_level
1019 .into_iter()
1020 .map(PbSstableInfo::from)
1021 .collect(),
1022 })),
1023 },
1024 }
1025 }
1026}
1027
1028impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1029where
1030 PbSstableInfo: for<'a> From<&'a T>,
1031{
1032 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1033 match group_delta {
1034 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1035 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1036 },
1037 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1038 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1039 },
1040 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1041 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1042 },
1043 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1044 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1045 },
1046 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1047 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1048 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1049 })),
1050 },
1051 }
1052 }
1053}
1054
1055impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1056where
1057 T: for<'a> From<&'a PbSstableInfo>,
1058{
1059 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1060 match &pb_group_delta.delta_type {
1061 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1062 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1063 }
1064 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1065 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1066 }
1067 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1068 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1069 }
1070 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1071 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1072 }
1073 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1074 pb_new_sub_level
1075 .inserted_table_infos
1076 .iter()
1077 .map(T::from)
1078 .collect(),
1079 ),
1080 None => panic!("delta_type is not set"),
1081 }
1082 }
1083}
1084
1085#[derive(Debug, PartialEq, Clone, Default)]
1086pub struct GroupDeltasCommon<T> {
1087 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1088}
1089
1090pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1091
1092impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1093where
1094 T: From<PbSstableInfo>,
1095{
1096 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1097 Self {
1098 group_deltas: pb_group_deltas
1099 .group_deltas
1100 .into_iter()
1101 .map(GroupDeltaCommon::from)
1102 .collect_vec(),
1103 }
1104 }
1105}
1106
1107impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1108where
1109 PbSstableInfo: From<T>,
1110{
1111 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1112 Self {
1113 group_deltas: group_deltas
1114 .group_deltas
1115 .into_iter()
1116 .map(|group_delta| group_delta.into())
1117 .collect_vec(),
1118 }
1119 }
1120}
1121
1122impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1123where
1124 PbSstableInfo: for<'a> From<&'a T>,
1125{
1126 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1127 Self {
1128 group_deltas: group_deltas
1129 .group_deltas
1130 .iter()
1131 .map(|group_delta| group_delta.into())
1132 .collect_vec(),
1133 }
1134 }
1135}
1136
1137impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1138where
1139 T: for<'a> From<&'a PbSstableInfo>,
1140{
1141 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1142 Self {
1143 group_deltas: pb_group_deltas
1144 .group_deltas
1145 .iter()
1146 .map(GroupDeltaCommon::from)
1147 .collect_vec(),
1148 }
1149 }
1150}
1151
1152impl<T> GroupDeltasCommon<T>
1153where
1154 PbSstableInfo: for<'a> From<&'a T>,
1155{
1156 pub fn to_protobuf(&self) -> PbGroupDeltas {
1157 self.into()
1158 }
1159}
1160
1161impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1162 #[expect(deprecated)]
1163 fn from(delta: HummockVersionDelta) -> Self {
1164 Self {
1165 id: delta.id,
1166 prev_id: delta.prev_id,
1167 group_deltas: delta.group_deltas,
1168 max_committed_epoch: delta.max_committed_epoch,
1169 trivial_move: delta.trivial_move,
1170 new_table_watermarks: delta.new_table_watermarks,
1171 removed_table_ids: delta.removed_table_ids,
1172 change_log_delta: delta
1173 .change_log_delta
1174 .into_iter()
1175 .map(|(k, v)| {
1176 (
1177 k,
1178 ChangeLogDeltaCommon {
1179 truncate_epoch: v.truncate_epoch,
1180 new_log: EpochNewChangeLogCommon {
1181 epochs: v.new_log.epochs,
1182 new_value: Vec::new(),
1183 old_value: Vec::new(),
1184 },
1185 },
1186 )
1187 })
1188 .collect(),
1189 state_table_info_delta: delta.state_table_info_delta,
1190 }
1191 }
1192}
1193
1194impl From<HummockVersion> for LocalHummockVersion {
1195 #[expect(deprecated)]
1196 fn from(version: HummockVersion) -> Self {
1197 Self {
1198 id: version.id,
1199 levels: version.levels,
1200 max_committed_epoch: version.max_committed_epoch,
1201 table_watermarks: version.table_watermarks,
1202 table_change_log: version
1203 .table_change_log
1204 .into_iter()
1205 .map(|(k, v)| {
1206 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1207 .change_log_into_iter()
1208 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1209 epochs: epoch_new_change_log.epochs,
1210 new_value: Vec::new(),
1211 old_value: Vec::new(),
1212 })
1213 .collect();
1214 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1215 })
1216 .collect(),
1217 state_table_info: version.state_table_info,
1218 }
1219 }
1220}