1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43pub const MAX_HUMMOCK_VERSION_ID: HummockVersionId = HummockVersionId::new(i64::MAX as _);
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct HummockVersionStateTableInfo {
47 state_table_info: HashMap<TableId, PbStateTableInfo>,
48
49 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
51}
52
53impl HummockVersionStateTableInfo {
54 pub fn empty() -> Self {
55 Self {
56 state_table_info: HashMap::new(),
57 compaction_group_member_tables: HashMap::new(),
58 }
59 }
60
61 fn build_compaction_group_member_tables(
62 state_table_info: &HashMap<TableId, PbStateTableInfo>,
63 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
64 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
65 for (table_id, info) in state_table_info {
66 assert!(
67 ret.entry(info.compaction_group_id)
68 .or_default()
69 .insert(*table_id)
70 );
71 }
72 ret
73 }
74
75 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
76 self.state_table_info
77 .iter()
78 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
79 .collect()
80 }
81
82 pub fn from_protobuf(state_table_info: &HashMap<TableId, PbStateTableInfo>) -> Self {
83 let state_table_info = state_table_info
84 .iter()
85 .map(|(table_id, info)| (*table_id, *info))
86 .collect();
87 let compaction_group_member_tables =
88 Self::build_compaction_group_member_tables(&state_table_info);
89 Self {
90 state_table_info,
91 compaction_group_member_tables,
92 }
93 }
94
95 pub fn apply_delta(
96 &mut self,
97 delta: &HashMap<TableId, StateTableInfoDelta>,
98 removed_table_id: &HashSet<TableId>,
99 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
100 let mut changed_table = HashMap::new();
101 let mut has_bumped_committed_epoch = false;
102 fn remove_table_from_compaction_group(
103 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
104 compaction_group_id: CompactionGroupId,
105 table_id: TableId,
106 ) {
107 let member_tables = compaction_group_member_tables
108 .get_mut(&compaction_group_id)
109 .expect("should exist");
110 assert!(member_tables.remove(&table_id));
111 if member_tables.is_empty() {
112 assert!(
113 compaction_group_member_tables
114 .remove(&compaction_group_id)
115 .is_some()
116 );
117 }
118 }
119 for table_id in removed_table_id {
120 if let Some(prev_info) = self.state_table_info.remove(table_id) {
121 remove_table_from_compaction_group(
122 &mut self.compaction_group_member_tables,
123 prev_info.compaction_group_id,
124 *table_id,
125 );
126 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
127 } else {
128 warn!(
129 %table_id,
130 "table to remove does not exist"
131 );
132 }
133 }
134 for (table_id, delta) in delta {
135 if removed_table_id.contains(table_id) {
136 continue;
137 }
138 let new_info = StateTableInfo {
139 committed_epoch: delta.committed_epoch,
140 compaction_group_id: delta.compaction_group_id,
141 };
142 match self.state_table_info.entry(*table_id) {
143 Entry::Occupied(mut entry) => {
144 let prev_info = entry.get_mut();
145 assert!(
146 new_info.committed_epoch >= prev_info.committed_epoch,
147 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
148 table_id,
149 prev_info,
150 new_info
151 );
152 if new_info.committed_epoch > prev_info.committed_epoch {
153 has_bumped_committed_epoch = true;
154 }
155 if prev_info.compaction_group_id != new_info.compaction_group_id {
156 remove_table_from_compaction_group(
158 &mut self.compaction_group_member_tables,
159 prev_info.compaction_group_id,
160 *table_id,
161 );
162 assert!(
163 self.compaction_group_member_tables
164 .entry(new_info.compaction_group_id)
165 .or_default()
166 .insert(*table_id)
167 );
168 }
169 let prev_info = replace(prev_info, new_info);
170 changed_table.insert(*table_id, Some(prev_info));
171 }
172 Entry::Vacant(entry) => {
173 assert!(
174 self.compaction_group_member_tables
175 .entry(new_info.compaction_group_id)
176 .or_default()
177 .insert(*table_id)
178 );
179 has_bumped_committed_epoch = true;
180 entry.insert(new_info);
181 changed_table.insert(*table_id, None);
182 }
183 }
184 }
185 debug_assert_eq!(
186 self.compaction_group_member_tables,
187 Self::build_compaction_group_member_tables(&self.state_table_info)
188 );
189 (changed_table, has_bumped_committed_epoch)
190 }
191
192 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
193 &self.state_table_info
194 }
195
196 pub fn compaction_group_member_table_ids(
197 &self,
198 compaction_group_id: CompactionGroupId,
199 ) -> &BTreeSet<TableId> {
200 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
201 self.compaction_group_member_tables
202 .get(&compaction_group_id)
203 .unwrap_or_else(|| EMPTY_SET.deref())
204 }
205
206 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
207 &self.compaction_group_member_tables
208 }
209
210 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
211 self.state_table_info
212 .values()
213 .map(|info| info.committed_epoch)
214 .max()
215 }
216}
217
218#[derive(Debug, Clone, PartialEq)]
219pub struct HummockVersionCommon<T, L = T> {
220 pub id: HummockVersionId,
221 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
222 #[deprecated]
223 pub(crate) max_committed_epoch: u64,
224 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
225 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
226 pub state_table_info: HummockVersionStateTableInfo,
227 pub vector_indexes: HashMap<TableId, VectorIndex>,
228}
229
230pub type HummockVersion = HummockVersionCommon<SstableInfo>;
231
232pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
233
234impl Default for HummockVersion {
235 fn default() -> Self {
236 HummockVersion::from(&PbHummockVersion::default())
237 }
238}
239
240impl<T> HummockVersionCommon<T>
241where
242 T: for<'a> From<&'a PbSstableInfo>,
243 PbSstableInfo: for<'a> From<&'a T>,
244{
245 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
248 pb_version.into()
249 }
250
251 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
254 pb_version.into()
255 }
256
257 pub fn to_protobuf(&self) -> PbHummockVersion {
258 self.into()
259 }
260}
261
262impl HummockVersion {
263 pub fn estimated_encode_len(&self) -> usize {
264 self.levels.len() * size_of::<CompactionGroupId>()
265 + self
266 .levels
267 .values()
268 .map(|level| level.estimated_encode_len())
269 .sum::<usize>()
270 + self.table_watermarks.len() * size_of::<u32>()
271 + self
272 .table_watermarks
273 .values()
274 .map(|table_watermark| table_watermark.estimated_encode_len())
275 .sum::<usize>()
276 + self
277 .table_change_log
278 .values()
279 .map(|c| {
280 c.iter()
281 .map(|l| {
282 l.old_value
283 .iter()
284 .chain(l.new_value.iter())
285 .map(|s| s.estimated_encode_len())
286 .sum::<usize>()
287 })
288 .sum::<usize>()
289 })
290 .sum::<usize>()
291 }
292}
293
294impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
295where
296 T: for<'a> From<&'a PbSstableInfo>,
297{
298 fn from(pb_version: &PbHummockVersion) -> Self {
299 #[expect(deprecated)]
300 Self {
301 id: pb_version.id,
302 levels: pb_version
303 .levels
304 .iter()
305 .map(|(group_id, levels)| (*group_id, LevelsCommon::from(levels)))
306 .collect(),
307 max_committed_epoch: pb_version.max_committed_epoch,
308 table_watermarks: pb_version
309 .table_watermarks
310 .iter()
311 .map(|(table_id, table_watermark)| {
312 (*table_id, Arc::new(TableWatermarks::from(table_watermark)))
313 })
314 .collect(),
315 table_change_log: pb_version
316 .table_change_logs
317 .iter()
318 .map(|(table_id, change_log)| {
319 (*table_id, TableChangeLogCommon::from_protobuf(change_log))
320 })
321 .collect(),
322 state_table_info: HummockVersionStateTableInfo::from_protobuf(
323 &pb_version.state_table_info,
324 ),
325 vector_indexes: pb_version
326 .vector_indexes
327 .iter()
328 .map(|(table_id, index)| (*table_id, index.clone().into()))
329 .collect(),
330 }
331 }
332}
333
334impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
335where
336 PbSstableInfo: for<'a> From<&'a T>,
337{
338 fn from(version: &HummockVersionCommon<T>) -> Self {
339 #[expect(deprecated)]
340 Self {
341 id: version.id,
342 levels: version
343 .levels
344 .iter()
345 .map(|(group_id, levels)| (*group_id, levels.into()))
346 .collect(),
347 max_committed_epoch: version.max_committed_epoch,
348 table_watermarks: version
349 .table_watermarks
350 .iter()
351 .map(|(table_id, watermark)| (*table_id, watermark.as_ref().into()))
352 .collect(),
353 table_change_logs: version
354 .table_change_log
355 .iter()
356 .map(|(table_id, change_log)| (*table_id, change_log.to_protobuf()))
357 .collect(),
358 state_table_info: version.state_table_info.state_table_info.clone(),
359 vector_indexes: version
360 .vector_indexes
361 .iter()
362 .map(|(table_id, index)| (*table_id, index.clone().into()))
363 .collect(),
364 }
365 }
366}
367
368impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
369where
370 PbSstableInfo: From<T>,
371 PbSstableInfo: for<'a> From<&'a T>,
372{
373 fn from(version: HummockVersionCommon<T>) -> Self {
374 #[expect(deprecated)]
375 Self {
376 id: version.id,
377 levels: version
378 .levels
379 .into_iter()
380 .map(|(group_id, levels)| (group_id, levels.into()))
381 .collect(),
382 max_committed_epoch: version.max_committed_epoch,
383 table_watermarks: version
384 .table_watermarks
385 .into_iter()
386 .map(|(table_id, watermark)| (table_id, watermark.as_ref().into()))
387 .collect(),
388 table_change_logs: version
389 .table_change_log
390 .into_iter()
391 .map(|(table_id, change_log)| (table_id, change_log.to_protobuf()))
392 .collect(),
393 state_table_info: version.state_table_info.state_table_info.clone(),
394 vector_indexes: version
395 .vector_indexes
396 .into_iter()
397 .map(|(table_id, index)| (table_id, index.into()))
398 .collect(),
399 }
400 }
401}
402
403impl HummockVersion {
404 pub fn next_version_id(&self) -> HummockVersionId {
405 self.id + 1
406 }
407
408 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
409 self.state_table_info.state_table_info.is_empty()
411 && self.levels.values().any(|group| {
412 #[expect(deprecated)]
414 !group.member_table_ids.is_empty()
415 })
416 }
417
418 pub fn may_fill_backward_compatible_state_table_info_delta(
419 &self,
420 delta: &mut HummockVersionDelta,
421 ) {
422 #[expect(deprecated)]
423 for (cg_id, group) in &self.levels {
425 for table_id in &group.member_table_ids {
426 assert!(
427 delta
428 .state_table_info_delta
429 .insert(
430 (*table_id).into(),
431 StateTableInfoDelta {
432 committed_epoch: self.max_committed_epoch,
433 compaction_group_id: *cg_id,
434 }
435 )
436 .is_none(),
437 "duplicate table id {} in cg {}",
438 table_id,
439 cg_id
440 );
441 }
442 }
443 }
444
445 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
446 #[expect(deprecated)]
447 let mut init_version = HummockVersion {
448 id: FIRST_VERSION_ID,
449 levels: Default::default(),
450 max_committed_epoch: INVALID_EPOCH,
451 table_watermarks: HashMap::new(),
452 table_change_log: HashMap::new(),
453 state_table_info: HummockVersionStateTableInfo::empty(),
454 vector_indexes: Default::default(),
455 };
456 for group_id in [
457 StaticCompactionGroupId::StateDefault as CompactionGroupId,
458 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
459 ] {
460 init_version.levels.insert(
461 group_id,
462 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
463 );
464 }
465 init_version
466 }
467
468 pub fn version_delta_after(&self) -> HummockVersionDelta {
469 #[expect(deprecated)]
470 HummockVersionDelta {
471 id: self.next_version_id(),
472 prev_id: self.id,
473 trivial_move: false,
474 max_committed_epoch: self.max_committed_epoch,
475 group_deltas: Default::default(),
476 new_table_watermarks: HashMap::new(),
477 removed_table_ids: HashSet::new(),
478 change_log_delta: HashMap::new(),
479 state_table_info_delta: Default::default(),
480 vector_index_delta: Default::default(),
481 }
482 }
483
484 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
485 let table_change_log = {
486 let mut table_change_log = HashMap::new();
487 for (table_id, log) in &mut self.table_change_log {
488 let change_log_iter =
489 log.change_log_iter_mut()
490 .map(|item| EpochNewChangeLogCommon {
491 new_value: std::mem::take(&mut item.new_value),
492 old_value: std::mem::take(&mut item.old_value),
493 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
494 checkpoint_epoch: item.checkpoint_epoch,
495 });
496 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
497 }
498
499 table_change_log
500 };
501
502 let local_version = LocalHummockVersion::from(self);
503
504 (local_version, table_change_log)
505 }
506}
507
508impl<T, L> HummockVersionCommon<T, L> {
509 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
510 self.state_table_info
511 .info()
512 .get(&table_id)
513 .map(|info| info.committed_epoch)
514 }
515}
516
517#[derive(Debug, PartialEq, Clone)]
518pub struct HummockVersionDeltaCommon<T, L = T> {
519 pub id: HummockVersionId,
520 pub prev_id: HummockVersionId,
521 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
522 #[deprecated]
523 pub(crate) max_committed_epoch: u64,
524 pub trivial_move: bool,
525 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
526 pub removed_table_ids: HashSet<TableId>,
527 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
528 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
529 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
530}
531
532pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
533
534pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
535
536impl Default for HummockVersionDelta {
537 fn default() -> Self {
538 HummockVersionDelta::from(&PbHummockVersionDelta::default())
539 }
540}
541
542impl<T> HummockVersionDeltaCommon<T>
543where
544 T: for<'a> From<&'a PbSstableInfo>,
545 PbSstableInfo: for<'a> From<&'a T>,
546{
547 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
550 delta.into()
551 }
552
553 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
556 delta.into()
557 }
558
559 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
560 self.into()
561 }
562}
563
564pub trait SstableIdReader {
565 fn sst_id(&self) -> HummockSstableId;
566}
567
568pub trait ObjectIdReader {
569 fn object_id(&self) -> HummockSstableObjectId;
570}
571
572impl<T> HummockVersionDeltaCommon<T>
573where
574 T: SstableIdReader + ObjectIdReader,
575{
576 pub fn newly_added_object_ids(
581 &self,
582 exclude_table_change_log: bool,
583 ) -> HashSet<HummockObjectId> {
584 match HummockObjectId::Sstable(0.into()) {
588 HummockObjectId::Sstable(_) => {}
589 HummockObjectId::VectorFile(_) => {}
590 HummockObjectId::HnswGraphFile(_) => {}
591 };
592 self.newly_added_sst_infos(exclude_table_change_log)
593 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
594 .chain(
595 self.vector_index_delta
596 .values()
597 .flat_map(|vector_index_delta| {
598 vector_index_delta
599 .newly_added_objects()
600 .map(|(object_id, _)| object_id)
601 }),
602 )
603 .collect()
604 }
605
606 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
607 self.newly_added_sst_infos(exclude_table_change_log)
608 .map(|sst| sst.sst_id())
609 .collect()
610 }
611}
612
613impl<T> HummockVersionDeltaCommon<T> {
614 pub fn newly_added_sst_infos(
615 &self,
616 exclude_table_change_log: bool,
617 ) -> impl Iterator<Item = &'_ T> {
618 let may_table_change_delta = if exclude_table_change_log {
619 None
620 } else {
621 Some(self.change_log_delta.values())
622 };
623 self.group_deltas
624 .values()
625 .flat_map(|group_deltas| {
626 group_deltas.group_deltas.iter().flat_map(|group_delta| {
627 let sst_slice = match &group_delta {
628 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
629 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
630 inserted_table_infos,
631 ..
632 }) => Some(inserted_table_infos.iter()),
633 GroupDeltaCommon::GroupConstruct(_)
634 | GroupDeltaCommon::GroupDestroy(_)
635 | GroupDeltaCommon::GroupMerge(_)
636 | GroupDeltaCommon::TruncateTables(_) => None,
637 };
638 sst_slice.into_iter().flatten()
639 })
640 })
641 .chain(
642 may_table_change_delta
643 .map(|v| {
644 v.flat_map(|delta| {
645 let new_log = &delta.new_log;
646 new_log.new_value.iter().chain(new_log.old_value.iter())
647 })
648 })
649 .into_iter()
650 .flatten(),
651 )
652 }
653}
654
655impl HummockVersionDelta {
656 #[expect(deprecated)]
657 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
658 self.max_committed_epoch
659 }
660}
661
662impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
663where
664 T: for<'a> From<&'a PbSstableInfo>,
665{
666 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
667 #[expect(deprecated)]
668 Self {
669 id: pb_version_delta.id,
670 prev_id: pb_version_delta.prev_id,
671 group_deltas: pb_version_delta
672 .group_deltas
673 .iter()
674 .map(|(group_id, deltas)| (*group_id, GroupDeltasCommon::from(deltas)))
675 .collect(),
676 max_committed_epoch: pb_version_delta.max_committed_epoch,
677 trivial_move: pb_version_delta.trivial_move,
678 new_table_watermarks: pb_version_delta
679 .new_table_watermarks
680 .iter()
681 .map(|(table_id, watermarks)| (*table_id, TableWatermarks::from(watermarks)))
682 .collect(),
683 removed_table_ids: pb_version_delta.removed_table_ids.iter().copied().collect(),
684 change_log_delta: pb_version_delta
685 .change_log_delta
686 .iter()
687 .map(|(table_id, log_delta)| {
688 (
689 *table_id,
690 ChangeLogDeltaCommon {
691 truncate_epoch: log_delta.truncate_epoch,
692 new_log: log_delta.new_log.as_ref().unwrap().into(),
693 },
694 )
695 })
696 .collect(),
697
698 state_table_info_delta: pb_version_delta
699 .state_table_info_delta
700 .iter()
701 .map(|(table_id, delta)| (*table_id, *delta))
702 .collect(),
703 vector_index_delta: pb_version_delta
704 .vector_index_delta
705 .iter()
706 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
707 .collect(),
708 }
709 }
710}
711
712impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
713where
714 PbSstableInfo: for<'a> From<&'a T>,
715{
716 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
717 #[expect(deprecated)]
718 Self {
719 id: version_delta.id,
720 prev_id: version_delta.prev_id,
721 group_deltas: version_delta
722 .group_deltas
723 .iter()
724 .map(|(group_id, deltas)| (*group_id, deltas.into()))
725 .collect(),
726 max_committed_epoch: version_delta.max_committed_epoch,
727 trivial_move: version_delta.trivial_move,
728 new_table_watermarks: version_delta
729 .new_table_watermarks
730 .iter()
731 .map(|(table_id, watermarks)| (*table_id, watermarks.into()))
732 .collect(),
733 removed_table_ids: version_delta.removed_table_ids.iter().copied().collect(),
734 change_log_delta: version_delta
735 .change_log_delta
736 .iter()
737 .map(|(table_id, log_delta)| (*table_id, log_delta.into()))
738 .collect(),
739 state_table_info_delta: version_delta.state_table_info_delta.clone(),
740 vector_index_delta: version_delta
741 .vector_index_delta
742 .iter()
743 .map(|(table_id, delta)| (*table_id, delta.clone().into()))
744 .collect(),
745 }
746 }
747}
748
749impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
750where
751 PbSstableInfo: From<T>,
752{
753 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
754 #[expect(deprecated)]
755 Self {
756 id: version_delta.id,
757 prev_id: version_delta.prev_id,
758 group_deltas: version_delta
759 .group_deltas
760 .into_iter()
761 .map(|(group_id, deltas)| (group_id, deltas.into()))
762 .collect(),
763 max_committed_epoch: version_delta.max_committed_epoch,
764 trivial_move: version_delta.trivial_move,
765 new_table_watermarks: version_delta
766 .new_table_watermarks
767 .into_iter()
768 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
769 .collect(),
770 removed_table_ids: version_delta.removed_table_ids.into_iter().collect(),
771 change_log_delta: version_delta
772 .change_log_delta
773 .into_iter()
774 .map(|(table_id, log_delta)| (table_id, log_delta.into()))
775 .collect(),
776 state_table_info_delta: version_delta.state_table_info_delta,
777 vector_index_delta: version_delta
778 .vector_index_delta
779 .into_iter()
780 .map(|(table_id, delta)| (table_id, delta.into()))
781 .collect(),
782 }
783 }
784}
785
786impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
787where
788 T: From<PbSstableInfo>,
789{
790 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
791 #[expect(deprecated)]
792 Self {
793 id: pb_version_delta.id,
794 prev_id: pb_version_delta.prev_id,
795 group_deltas: pb_version_delta
796 .group_deltas
797 .into_iter()
798 .map(|(group_id, deltas)| (group_id, deltas.into()))
799 .collect(),
800 max_committed_epoch: pb_version_delta.max_committed_epoch,
801 trivial_move: pb_version_delta.trivial_move,
802 new_table_watermarks: pb_version_delta
803 .new_table_watermarks
804 .into_iter()
805 .map(|(table_id, watermarks)| (table_id, watermarks.into()))
806 .collect(),
807 removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
808 change_log_delta: pb_version_delta
809 .change_log_delta
810 .iter()
811 .map(|(table_id, log_delta)| {
812 (
813 *table_id,
814 ChangeLogDeltaCommon {
815 new_log: log_delta.new_log.clone().unwrap().into(),
816 truncate_epoch: log_delta.truncate_epoch,
817 },
818 )
819 })
820 .collect(),
821 state_table_info_delta: pb_version_delta
822 .state_table_info_delta
823 .iter()
824 .map(|(table_id, delta)| (*table_id, *delta))
825 .collect(),
826 vector_index_delta: pb_version_delta
827 .vector_index_delta
828 .into_iter()
829 .map(|(table_id, delta)| (table_id, delta.into()))
830 .collect(),
831 }
832 }
833}
834
835#[derive(Debug, PartialEq, Clone)]
836pub struct IntraLevelDeltaCommon<T> {
837 pub level_idx: u32,
838 pub l0_sub_level_id: u64,
839 pub removed_table_ids: HashSet<HummockSstableId>,
840 pub inserted_table_infos: Vec<T>,
841 pub vnode_partition_count: u32,
842 pub compaction_group_version_id: u64,
843}
844
845pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
846
847impl IntraLevelDelta {
848 pub fn estimated_encode_len(&self) -> usize {
849 size_of::<u32>()
850 + size_of::<u64>()
851 + self.removed_table_ids.len() * size_of::<u32>()
852 + self
853 .inserted_table_infos
854 .iter()
855 .map(|sst| sst.estimated_encode_len())
856 .sum::<usize>()
857 + size_of::<u32>()
858 }
859}
860
861impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
862where
863 T: From<PbSstableInfo>,
864{
865 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
866 Self {
867 level_idx: pb_intra_level_delta.level_idx,
868 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
869 removed_table_ids: HashSet::from_iter(
870 pb_intra_level_delta.removed_table_ids.iter().copied(),
871 ),
872 inserted_table_infos: pb_intra_level_delta
873 .inserted_table_infos
874 .into_iter()
875 .map(Into::into)
876 .collect_vec(),
877 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
878 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
879 }
880 }
881}
882
883impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
884where
885 PbSstableInfo: From<T>,
886{
887 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
888 Self {
889 level_idx: intra_level_delta.level_idx,
890 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
891 removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(),
892 inserted_table_infos: intra_level_delta
893 .inserted_table_infos
894 .into_iter()
895 .map(Into::into)
896 .collect_vec(),
897 vnode_partition_count: intra_level_delta.vnode_partition_count,
898 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
899 }
900 }
901}
902
903impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
904where
905 PbSstableInfo: for<'a> From<&'a T>,
906{
907 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
908 Self {
909 level_idx: intra_level_delta.level_idx,
910 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
911 removed_table_ids: intra_level_delta
912 .removed_table_ids
913 .iter()
914 .copied()
915 .collect(),
916 inserted_table_infos: intra_level_delta
917 .inserted_table_infos
918 .iter()
919 .map(Into::into)
920 .collect_vec(),
921 vnode_partition_count: intra_level_delta.vnode_partition_count,
922 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
923 }
924 }
925}
926
927impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
928where
929 T: for<'a> From<&'a PbSstableInfo>,
930{
931 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
932 Self {
933 level_idx: pb_intra_level_delta.level_idx,
934 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
935 removed_table_ids: HashSet::from_iter(
936 pb_intra_level_delta.removed_table_ids.iter().copied(),
937 ),
938 inserted_table_infos: pb_intra_level_delta
939 .inserted_table_infos
940 .iter()
941 .map(Into::into)
942 .collect_vec(),
943 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
944 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
945 }
946 }
947}
948
949impl IntraLevelDelta {
950 pub fn new(
951 level_idx: u32,
952 l0_sub_level_id: u64,
953 removed_table_ids: HashSet<HummockSstableId>,
954 inserted_table_infos: Vec<SstableInfo>,
955 vnode_partition_count: u32,
956 compaction_group_version_id: u64,
957 ) -> Self {
958 Self {
959 level_idx,
960 l0_sub_level_id,
961 removed_table_ids,
962 inserted_table_infos,
963 vnode_partition_count,
964 compaction_group_version_id,
965 }
966 }
967}
968
969#[derive(Debug, PartialEq, Clone)]
970pub enum GroupDeltaCommon<T> {
971 NewL0SubLevel(Vec<T>),
972 IntraLevel(IntraLevelDeltaCommon<T>),
973 GroupConstruct(Box<PbGroupConstruct>),
974 GroupDestroy(PbGroupDestroy),
975 GroupMerge(PbGroupMerge),
976 TruncateTables(HashSet<TableId>),
977}
978
979pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
980
981impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
982where
983 T: From<PbSstableInfo>,
984{
985 fn from(pb_group_delta: PbGroupDelta) -> Self {
986 match pb_group_delta.delta_type {
987 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
988 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
989 }
990 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
991 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
992 }
993 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
994 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
995 }
996 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
997 GroupDeltaCommon::GroupMerge(pb_group_merge)
998 }
999 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1000 pb_new_sub_level
1001 .inserted_table_infos
1002 .into_iter()
1003 .map(T::from)
1004 .collect(),
1005 ),
1006 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1007 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.into_iter().collect())
1008 }
1009
1010 None => panic!("delta_type is not set"),
1011 }
1012 }
1013}
1014
1015impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1016where
1017 PbSstableInfo: From<T>,
1018{
1019 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1020 match group_delta {
1021 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1022 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1023 },
1024 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1025 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1026 },
1027 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1028 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1029 },
1030 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1031 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1032 },
1033 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1034 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1035 inserted_table_infos: new_sub_level
1036 .into_iter()
1037 .map(PbSstableInfo::from)
1038 .collect(),
1039 })),
1040 },
1041 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1042 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1043 table_ids: table_ids.iter().copied().collect(),
1044 })),
1045 },
1046 }
1047 }
1048}
1049
1050impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1051where
1052 PbSstableInfo: for<'a> From<&'a T>,
1053{
1054 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1055 match group_delta {
1056 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1057 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1058 },
1059 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1060 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1061 },
1062 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1063 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1064 },
1065 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1066 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1067 },
1068 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1069 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1070 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1071 })),
1072 },
1073 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1074 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1075 table_ids: table_ids.iter().copied().collect(),
1076 })),
1077 },
1078 }
1079 }
1080}
1081
1082impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1083where
1084 T: for<'a> From<&'a PbSstableInfo>,
1085{
1086 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1087 match &pb_group_delta.delta_type {
1088 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1089 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1090 }
1091 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1092 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1093 }
1094 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1095 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1096 }
1097 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1098 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1099 }
1100 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1101 pb_new_sub_level
1102 .inserted_table_infos
1103 .iter()
1104 .map(T::from)
1105 .collect(),
1106 ),
1107 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1108 GroupDeltaCommon::TruncateTables(
1109 pb_truncate_tables.table_ids.iter().copied().collect(),
1110 )
1111 }
1112 None => panic!("delta_type is not set"),
1113 }
1114 }
1115}
1116
1117#[derive(Debug, PartialEq, Clone)]
1118pub struct GroupDeltasCommon<T> {
1119 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1120}
1121
1122impl<T> Default for GroupDeltasCommon<T> {
1123 fn default() -> Self {
1124 Self {
1125 group_deltas: vec![],
1126 }
1127 }
1128}
1129
1130pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1131
1132impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1133where
1134 T: From<PbSstableInfo>,
1135{
1136 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1137 Self {
1138 group_deltas: pb_group_deltas
1139 .group_deltas
1140 .into_iter()
1141 .map(GroupDeltaCommon::from)
1142 .collect_vec(),
1143 }
1144 }
1145}
1146
1147impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1148where
1149 PbSstableInfo: From<T>,
1150{
1151 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1152 Self {
1153 group_deltas: group_deltas
1154 .group_deltas
1155 .into_iter()
1156 .map(|group_delta| group_delta.into())
1157 .collect_vec(),
1158 }
1159 }
1160}
1161
1162impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1163where
1164 PbSstableInfo: for<'a> From<&'a T>,
1165{
1166 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1167 Self {
1168 group_deltas: group_deltas
1169 .group_deltas
1170 .iter()
1171 .map(|group_delta| group_delta.into())
1172 .collect_vec(),
1173 }
1174 }
1175}
1176
1177impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1178where
1179 T: for<'a> From<&'a PbSstableInfo>,
1180{
1181 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1182 Self {
1183 group_deltas: pb_group_deltas
1184 .group_deltas
1185 .iter()
1186 .map(GroupDeltaCommon::from)
1187 .collect_vec(),
1188 }
1189 }
1190}
1191
1192impl<T> GroupDeltasCommon<T>
1193where
1194 PbSstableInfo: for<'a> From<&'a T>,
1195{
1196 pub fn to_protobuf(&self) -> PbGroupDeltas {
1197 self.into()
1198 }
1199}
1200
1201impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1202 #[expect(deprecated)]
1203 fn from(delta: HummockVersionDelta) -> Self {
1204 Self {
1205 id: delta.id,
1206 prev_id: delta.prev_id,
1207 group_deltas: delta.group_deltas,
1208 max_committed_epoch: delta.max_committed_epoch,
1209 trivial_move: delta.trivial_move,
1210 new_table_watermarks: delta.new_table_watermarks,
1211 removed_table_ids: delta.removed_table_ids,
1212 change_log_delta: delta
1213 .change_log_delta
1214 .into_iter()
1215 .map(|(k, v)| {
1216 (
1217 k,
1218 ChangeLogDeltaCommon {
1219 truncate_epoch: v.truncate_epoch,
1220 new_log: EpochNewChangeLogCommon {
1221 new_value: Vec::new(),
1222 old_value: Vec::new(),
1223 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1224 checkpoint_epoch: v.new_log.checkpoint_epoch,
1225 },
1226 },
1227 )
1228 })
1229 .collect(),
1230 state_table_info_delta: delta.state_table_info_delta,
1231 vector_index_delta: delta.vector_index_delta,
1232 }
1233 }
1234}
1235
1236impl From<HummockVersion> for LocalHummockVersion {
1237 #[expect(deprecated)]
1238 fn from(version: HummockVersion) -> Self {
1239 Self {
1240 id: version.id,
1241 levels: version.levels,
1242 max_committed_epoch: version.max_committed_epoch,
1243 table_watermarks: version.table_watermarks,
1244 table_change_log: version
1245 .table_change_log
1246 .into_iter()
1247 .map(|(k, v)| {
1248 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1249 .change_log_into_iter()
1250 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1251 new_value: Vec::new(),
1252 old_value: Vec::new(),
1253 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1254 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1255 })
1256 .collect();
1257 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1258 })
1259 .collect(),
1260 state_table_info: version.state_table_info,
1261 vector_indexes: version.vector_indexes,
1262 }
1263 }
1264}