1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (TableId::new(*table_id), *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
94 self.state_table_info
95 .iter()
96 .map(|(table_id, info)| (table_id.table_id, *info))
97 .collect()
98 }
99
100 pub fn apply_delta(
101 &mut self,
102 delta: &HashMap<TableId, StateTableInfoDelta>,
103 removed_table_id: &HashSet<TableId>,
104 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
105 let mut changed_table = HashMap::new();
106 let mut has_bumped_committed_epoch = false;
107 fn remove_table_from_compaction_group(
108 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
109 compaction_group_id: CompactionGroupId,
110 table_id: TableId,
111 ) {
112 let member_tables = compaction_group_member_tables
113 .get_mut(&compaction_group_id)
114 .expect("should exist");
115 assert!(member_tables.remove(&table_id));
116 if member_tables.is_empty() {
117 assert!(
118 compaction_group_member_tables
119 .remove(&compaction_group_id)
120 .is_some()
121 );
122 }
123 }
124 for table_id in removed_table_id {
125 if let Some(prev_info) = self.state_table_info.remove(table_id) {
126 remove_table_from_compaction_group(
127 &mut self.compaction_group_member_tables,
128 prev_info.compaction_group_id,
129 *table_id,
130 );
131 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
132 } else {
133 warn!(
134 table_id = table_id.table_id,
135 "table to remove does not exist"
136 );
137 }
138 }
139 for (table_id, delta) in delta {
140 if removed_table_id.contains(table_id) {
141 continue;
142 }
143 let new_info = StateTableInfo {
144 committed_epoch: delta.committed_epoch,
145 compaction_group_id: delta.compaction_group_id,
146 };
147 match self.state_table_info.entry(*table_id) {
148 Entry::Occupied(mut entry) => {
149 let prev_info = entry.get_mut();
150 assert!(
151 new_info.committed_epoch >= prev_info.committed_epoch,
152 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
153 table_id.table_id,
154 prev_info,
155 new_info
156 );
157 if new_info.committed_epoch > prev_info.committed_epoch {
158 has_bumped_committed_epoch = true;
159 }
160 if prev_info.compaction_group_id != new_info.compaction_group_id {
161 remove_table_from_compaction_group(
163 &mut self.compaction_group_member_tables,
164 prev_info.compaction_group_id,
165 *table_id,
166 );
167 assert!(
168 self.compaction_group_member_tables
169 .entry(new_info.compaction_group_id)
170 .or_default()
171 .insert(*table_id)
172 );
173 }
174 let prev_info = replace(prev_info, new_info);
175 changed_table.insert(*table_id, Some(prev_info));
176 }
177 Entry::Vacant(entry) => {
178 assert!(
179 self.compaction_group_member_tables
180 .entry(new_info.compaction_group_id)
181 .or_default()
182 .insert(*table_id)
183 );
184 has_bumped_committed_epoch = true;
185 entry.insert(new_info);
186 changed_table.insert(*table_id, None);
187 }
188 }
189 }
190 debug_assert_eq!(
191 self.compaction_group_member_tables,
192 Self::build_compaction_group_member_tables(&self.state_table_info)
193 );
194 (changed_table, has_bumped_committed_epoch)
195 }
196
197 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
198 &self.state_table_info
199 }
200
201 pub fn compaction_group_member_table_ids(
202 &self,
203 compaction_group_id: CompactionGroupId,
204 ) -> &BTreeSet<TableId> {
205 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
206 self.compaction_group_member_tables
207 .get(&compaction_group_id)
208 .unwrap_or_else(|| EMPTY_SET.deref())
209 }
210
211 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
212 &self.compaction_group_member_tables
213 }
214
215 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
216 self.state_table_info
217 .values()
218 .map(|info| info.committed_epoch)
219 .max()
220 }
221}
222
223#[derive(Debug, Clone, PartialEq)]
224pub struct HummockVersionCommon<T, L = T> {
225 pub id: HummockVersionId,
226 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
227 #[deprecated]
228 pub(crate) max_committed_epoch: u64,
229 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
230 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
231 pub state_table_info: HummockVersionStateTableInfo,
232 pub vector_indexes: HashMap<TableId, VectorIndex>,
233}
234
235pub type HummockVersion = HummockVersionCommon<SstableInfo>;
236
237pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
238
239impl Default for HummockVersion {
240 fn default() -> Self {
241 HummockVersion::from(&PbHummockVersion::default())
242 }
243}
244
245impl<T> HummockVersionCommon<T>
246where
247 T: for<'a> From<&'a PbSstableInfo>,
248 PbSstableInfo: for<'a> From<&'a T>,
249{
250 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
253 pb_version.into()
254 }
255
256 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
259 pb_version.into()
260 }
261
262 pub fn to_protobuf(&self) -> PbHummockVersion {
263 self.into()
264 }
265}
266
267impl HummockVersion {
268 pub fn estimated_encode_len(&self) -> usize {
269 self.levels.len() * size_of::<CompactionGroupId>()
270 + self
271 .levels
272 .values()
273 .map(|level| level.estimated_encode_len())
274 .sum::<usize>()
275 + self.table_watermarks.len() * size_of::<u32>()
276 + self
277 .table_watermarks
278 .values()
279 .map(|table_watermark| table_watermark.estimated_encode_len())
280 .sum::<usize>()
281 + self
282 .table_change_log
283 .values()
284 .map(|c| {
285 c.iter()
286 .map(|l| {
287 l.old_value
288 .iter()
289 .chain(l.new_value.iter())
290 .map(|s| s.estimated_encode_len())
291 .sum::<usize>()
292 })
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 }
297}
298
299impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
300where
301 T: for<'a> From<&'a PbSstableInfo>,
302{
303 fn from(pb_version: &PbHummockVersion) -> Self {
304 #[expect(deprecated)]
305 Self {
306 id: HummockVersionId(pb_version.id),
307 levels: pb_version
308 .levels
309 .iter()
310 .map(|(group_id, levels)| {
311 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
312 })
313 .collect(),
314 max_committed_epoch: pb_version.max_committed_epoch,
315 table_watermarks: pb_version
316 .table_watermarks
317 .iter()
318 .map(|(table_id, table_watermark)| {
319 (
320 TableId::new(*table_id),
321 Arc::new(TableWatermarks::from(table_watermark)),
322 )
323 })
324 .collect(),
325 table_change_log: pb_version
326 .table_change_logs
327 .iter()
328 .map(|(table_id, change_log)| {
329 (
330 TableId::new(*table_id),
331 TableChangeLogCommon::from_protobuf(change_log),
332 )
333 })
334 .collect(),
335 state_table_info: HummockVersionStateTableInfo::from_protobuf(
336 &pb_version.state_table_info,
337 ),
338 vector_indexes: pb_version
339 .vector_indexes
340 .iter()
341 .map(|(table_id, index)| (table_id.into(), index.clone().into()))
342 .collect(),
343 }
344 }
345}
346
347impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
348where
349 PbSstableInfo: for<'a> From<&'a T>,
350{
351 fn from(version: &HummockVersionCommon<T>) -> Self {
352 #[expect(deprecated)]
353 Self {
354 id: version.id.0,
355 levels: version
356 .levels
357 .iter()
358 .map(|(group_id, levels)| (*group_id as _, levels.into()))
359 .collect(),
360 max_committed_epoch: version.max_committed_epoch,
361 table_watermarks: version
362 .table_watermarks
363 .iter()
364 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
365 .collect(),
366 table_change_logs: version
367 .table_change_log
368 .iter()
369 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
370 .collect(),
371 state_table_info: version.state_table_info.to_protobuf(),
372 vector_indexes: version
373 .vector_indexes
374 .iter()
375 .map(|(table_id, index)| (table_id.table_id, index.clone().into()))
376 .collect(),
377 }
378 }
379}
380
381impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
382where
383 PbSstableInfo: From<T>,
384 PbSstableInfo: for<'a> From<&'a T>,
385{
386 fn from(version: HummockVersionCommon<T>) -> Self {
387 #[expect(deprecated)]
388 Self {
389 id: version.id.0,
390 levels: version
391 .levels
392 .into_iter()
393 .map(|(group_id, levels)| (group_id as _, levels.into()))
394 .collect(),
395 max_committed_epoch: version.max_committed_epoch,
396 table_watermarks: version
397 .table_watermarks
398 .into_iter()
399 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
400 .collect(),
401 table_change_logs: version
402 .table_change_log
403 .into_iter()
404 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
405 .collect(),
406 state_table_info: version.state_table_info.to_protobuf(),
407 vector_indexes: version
408 .vector_indexes
409 .into_iter()
410 .map(|(table_id, index)| (table_id.table_id, index.into()))
411 .collect(),
412 }
413 }
414}
415
416impl HummockVersion {
417 pub fn next_version_id(&self) -> HummockVersionId {
418 self.id.next()
419 }
420
421 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
422 self.state_table_info.state_table_info.is_empty()
424 && self.levels.values().any(|group| {
425 #[expect(deprecated)]
427 !group.member_table_ids.is_empty()
428 })
429 }
430
431 pub fn may_fill_backward_compatible_state_table_info_delta(
432 &self,
433 delta: &mut HummockVersionDelta,
434 ) {
435 #[expect(deprecated)]
436 for (cg_id, group) in &self.levels {
438 for table_id in &group.member_table_ids {
439 assert!(
440 delta
441 .state_table_info_delta
442 .insert(
443 TableId::new(*table_id),
444 StateTableInfoDelta {
445 committed_epoch: self.max_committed_epoch,
446 compaction_group_id: *cg_id,
447 }
448 )
449 .is_none(),
450 "duplicate table id {} in cg {}",
451 table_id,
452 cg_id
453 );
454 }
455 }
456 }
457
458 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
459 #[expect(deprecated)]
460 let mut init_version = HummockVersion {
461 id: FIRST_VERSION_ID,
462 levels: Default::default(),
463 max_committed_epoch: INVALID_EPOCH,
464 table_watermarks: HashMap::new(),
465 table_change_log: HashMap::new(),
466 state_table_info: HummockVersionStateTableInfo::empty(),
467 vector_indexes: Default::default(),
468 };
469 for group_id in [
470 StaticCompactionGroupId::StateDefault as CompactionGroupId,
471 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
472 ] {
473 init_version.levels.insert(
474 group_id,
475 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
476 );
477 }
478 init_version
479 }
480
481 pub fn version_delta_after(&self) -> HummockVersionDelta {
482 #[expect(deprecated)]
483 HummockVersionDelta {
484 id: self.next_version_id(),
485 prev_id: self.id,
486 trivial_move: false,
487 max_committed_epoch: self.max_committed_epoch,
488 group_deltas: Default::default(),
489 new_table_watermarks: HashMap::new(),
490 removed_table_ids: HashSet::new(),
491 change_log_delta: HashMap::new(),
492 state_table_info_delta: Default::default(),
493 vector_index_delta: Default::default(),
494 }
495 }
496
497 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
498 let table_change_log = {
499 let mut table_change_log = HashMap::new();
500 for (table_id, log) in &mut self.table_change_log {
501 let change_log_iter =
502 log.change_log_iter_mut()
503 .map(|item| EpochNewChangeLogCommon {
504 new_value: std::mem::take(&mut item.new_value),
505 old_value: std::mem::take(&mut item.old_value),
506 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
507 checkpoint_epoch: item.checkpoint_epoch,
508 });
509 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
510 }
511
512 table_change_log
513 };
514
515 let local_version = LocalHummockVersion::from(self);
516
517 (local_version, table_change_log)
518 }
519}
520
521impl<T, L> HummockVersionCommon<T, L> {
522 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
523 self.state_table_info
524 .info()
525 .get(&table_id)
526 .map(|info| info.committed_epoch)
527 }
528}
529
530#[derive(Debug, PartialEq, Clone)]
531pub struct HummockVersionDeltaCommon<T, L = T> {
532 pub id: HummockVersionId,
533 pub prev_id: HummockVersionId,
534 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
535 #[deprecated]
536 pub(crate) max_committed_epoch: u64,
537 pub trivial_move: bool,
538 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
539 pub removed_table_ids: HashSet<TableId>,
540 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
541 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
542 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
543}
544
545pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
546
547pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
548
549impl Default for HummockVersionDelta {
550 fn default() -> Self {
551 HummockVersionDelta::from(&PbHummockVersionDelta::default())
552 }
553}
554
555impl<T> HummockVersionDeltaCommon<T>
556where
557 T: for<'a> From<&'a PbSstableInfo>,
558 PbSstableInfo: for<'a> From<&'a T>,
559{
560 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
563 delta.into()
564 }
565
566 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
569 delta.into()
570 }
571
572 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
573 self.into()
574 }
575}
576
577pub trait SstableIdReader {
578 fn sst_id(&self) -> HummockSstableId;
579}
580
581pub trait ObjectIdReader {
582 fn object_id(&self) -> HummockSstableObjectId;
583}
584
585impl<T> HummockVersionDeltaCommon<T>
586where
587 T: SstableIdReader + ObjectIdReader,
588{
589 pub fn newly_added_object_ids(
594 &self,
595 exclude_table_change_log: bool,
596 ) -> HashSet<HummockObjectId> {
597 match HummockObjectId::Sstable(0.into()) {
601 HummockObjectId::Sstable(_) => {}
602 HummockObjectId::VectorFile(_) => {}
603 HummockObjectId::HnswGraphFile(_) => {}
604 };
605 self.newly_added_sst_infos(exclude_table_change_log)
606 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
607 .chain(
608 self.vector_index_delta
609 .values()
610 .flat_map(|vector_index_delta| {
611 vector_index_delta
612 .newly_added_objects()
613 .map(|(object_id, _)| object_id)
614 }),
615 )
616 .collect()
617 }
618
619 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
620 self.newly_added_sst_infos(exclude_table_change_log)
621 .map(|sst| sst.sst_id())
622 .collect()
623 }
624}
625
626impl<T> HummockVersionDeltaCommon<T> {
627 pub fn newly_added_sst_infos(
628 &self,
629 exclude_table_change_log: bool,
630 ) -> impl Iterator<Item = &'_ T> {
631 let may_table_change_delta = if exclude_table_change_log {
632 None
633 } else {
634 Some(self.change_log_delta.values())
635 };
636 self.group_deltas
637 .values()
638 .flat_map(|group_deltas| {
639 group_deltas.group_deltas.iter().flat_map(|group_delta| {
640 let sst_slice = match &group_delta {
641 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
642 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
643 inserted_table_infos,
644 ..
645 }) => Some(inserted_table_infos.iter()),
646 GroupDeltaCommon::GroupConstruct(_)
647 | GroupDeltaCommon::GroupDestroy(_)
648 | GroupDeltaCommon::GroupMerge(_)
649 | GroupDeltaCommon::TruncateTables(_) => None,
650 };
651 sst_slice.into_iter().flatten()
652 })
653 })
654 .chain(
655 may_table_change_delta
656 .map(|v| {
657 v.flat_map(|delta| {
658 let new_log = &delta.new_log;
659 new_log.new_value.iter().chain(new_log.old_value.iter())
660 })
661 })
662 .into_iter()
663 .flatten(),
664 )
665 }
666}
667
668impl HummockVersionDelta {
669 #[expect(deprecated)]
670 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
671 self.max_committed_epoch
672 }
673}
674
675impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
676where
677 T: for<'a> From<&'a PbSstableInfo>,
678{
679 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
680 #[expect(deprecated)]
681 Self {
682 id: HummockVersionId(pb_version_delta.id),
683 prev_id: HummockVersionId(pb_version_delta.prev_id),
684 group_deltas: pb_version_delta
685 .group_deltas
686 .iter()
687 .map(|(group_id, deltas)| {
688 (
689 *group_id as CompactionGroupId,
690 GroupDeltasCommon::from(deltas),
691 )
692 })
693 .collect(),
694 max_committed_epoch: pb_version_delta.max_committed_epoch,
695 trivial_move: pb_version_delta.trivial_move,
696 new_table_watermarks: pb_version_delta
697 .new_table_watermarks
698 .iter()
699 .map(|(table_id, watermarks)| {
700 (TableId::new(*table_id), TableWatermarks::from(watermarks))
701 })
702 .collect(),
703 removed_table_ids: pb_version_delta
704 .removed_table_ids
705 .iter()
706 .map(|table_id| TableId::new(*table_id))
707 .collect(),
708 change_log_delta: pb_version_delta
709 .change_log_delta
710 .iter()
711 .map(|(table_id, log_delta)| {
712 (
713 TableId::new(*table_id),
714 ChangeLogDeltaCommon {
715 truncate_epoch: log_delta.truncate_epoch,
716 new_log: log_delta.new_log.as_ref().unwrap().into(),
717 },
718 )
719 })
720 .collect(),
721
722 state_table_info_delta: pb_version_delta
723 .state_table_info_delta
724 .iter()
725 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
726 .collect(),
727 vector_index_delta: pb_version_delta
728 .vector_index_delta
729 .iter()
730 .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone().into()))
731 .collect(),
732 }
733 }
734}
735
736impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
737where
738 PbSstableInfo: for<'a> From<&'a T>,
739{
740 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
741 #[expect(deprecated)]
742 Self {
743 id: version_delta.id.0,
744 prev_id: version_delta.prev_id.0,
745 group_deltas: version_delta
746 .group_deltas
747 .iter()
748 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
749 .collect(),
750 max_committed_epoch: version_delta.max_committed_epoch,
751 trivial_move: version_delta.trivial_move,
752 new_table_watermarks: version_delta
753 .new_table_watermarks
754 .iter()
755 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
756 .collect(),
757 removed_table_ids: version_delta
758 .removed_table_ids
759 .iter()
760 .map(|table_id| table_id.table_id)
761 .collect(),
762 change_log_delta: version_delta
763 .change_log_delta
764 .iter()
765 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
766 .collect(),
767 state_table_info_delta: version_delta
768 .state_table_info_delta
769 .iter()
770 .map(|(table_id, delta)| (table_id.table_id, *delta))
771 .collect(),
772 vector_index_delta: version_delta
773 .vector_index_delta
774 .iter()
775 .map(|(table_id, delta)| (table_id.table_id, delta.clone().into()))
776 .collect(),
777 }
778 }
779}
780
781impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
782where
783 PbSstableInfo: From<T>,
784{
785 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
786 #[expect(deprecated)]
787 Self {
788 id: version_delta.id.0,
789 prev_id: version_delta.prev_id.0,
790 group_deltas: version_delta
791 .group_deltas
792 .into_iter()
793 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
794 .collect(),
795 max_committed_epoch: version_delta.max_committed_epoch,
796 trivial_move: version_delta.trivial_move,
797 new_table_watermarks: version_delta
798 .new_table_watermarks
799 .into_iter()
800 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
801 .collect(),
802 removed_table_ids: version_delta
803 .removed_table_ids
804 .into_iter()
805 .map(|table_id| table_id.table_id)
806 .collect(),
807 change_log_delta: version_delta
808 .change_log_delta
809 .into_iter()
810 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
811 .collect(),
812 state_table_info_delta: version_delta
813 .state_table_info_delta
814 .into_iter()
815 .map(|(table_id, delta)| (table_id.table_id, delta))
816 .collect(),
817 vector_index_delta: version_delta
818 .vector_index_delta
819 .into_iter()
820 .map(|(table_id, delta)| (table_id.table_id, delta.into()))
821 .collect(),
822 }
823 }
824}
825
826impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
827where
828 T: From<PbSstableInfo>,
829{
830 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
831 #[expect(deprecated)]
832 Self {
833 id: HummockVersionId(pb_version_delta.id),
834 prev_id: HummockVersionId(pb_version_delta.prev_id),
835 group_deltas: pb_version_delta
836 .group_deltas
837 .into_iter()
838 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
839 .collect(),
840 max_committed_epoch: pb_version_delta.max_committed_epoch,
841 trivial_move: pb_version_delta.trivial_move,
842 new_table_watermarks: pb_version_delta
843 .new_table_watermarks
844 .into_iter()
845 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
846 .collect(),
847 removed_table_ids: pb_version_delta
848 .removed_table_ids
849 .into_iter()
850 .map(TableId::new)
851 .collect(),
852 change_log_delta: pb_version_delta
853 .change_log_delta
854 .iter()
855 .map(|(table_id, log_delta)| {
856 (
857 TableId::new(*table_id),
858 ChangeLogDeltaCommon {
859 new_log: log_delta.new_log.clone().unwrap().into(),
860 truncate_epoch: log_delta.truncate_epoch,
861 },
862 )
863 })
864 .collect(),
865 state_table_info_delta: pb_version_delta
866 .state_table_info_delta
867 .iter()
868 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
869 .collect(),
870 vector_index_delta: pb_version_delta
871 .vector_index_delta
872 .into_iter()
873 .map(|(table_id, delta)| (TableId::new(table_id), delta.into()))
874 .collect(),
875 }
876 }
877}
878
879#[derive(Debug, PartialEq, Clone)]
880pub struct IntraLevelDeltaCommon<T> {
881 pub level_idx: u32,
882 pub l0_sub_level_id: u64,
883 pub removed_table_ids: HashSet<HummockSstableId>,
884 pub inserted_table_infos: Vec<T>,
885 pub vnode_partition_count: u32,
886 pub compaction_group_version_id: u64,
887}
888
889pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
890
891impl IntraLevelDelta {
892 pub fn estimated_encode_len(&self) -> usize {
893 size_of::<u32>()
894 + size_of::<u64>()
895 + self.removed_table_ids.len() * size_of::<u32>()
896 + self
897 .inserted_table_infos
898 .iter()
899 .map(|sst| sst.estimated_encode_len())
900 .sum::<usize>()
901 + size_of::<u32>()
902 }
903}
904
905impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
906where
907 T: From<PbSstableInfo>,
908{
909 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
910 Self {
911 level_idx: pb_intra_level_delta.level_idx,
912 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
913 removed_table_ids: HashSet::from_iter(
914 pb_intra_level_delta
915 .removed_table_ids
916 .iter()
917 .map(|sst_id| (*sst_id).into()),
918 ),
919 inserted_table_infos: pb_intra_level_delta
920 .inserted_table_infos
921 .into_iter()
922 .map(Into::into)
923 .collect_vec(),
924 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
925 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
926 }
927 }
928}
929
930impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
931where
932 PbSstableInfo: From<T>,
933{
934 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
935 Self {
936 level_idx: intra_level_delta.level_idx,
937 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
938 removed_table_ids: intra_level_delta
939 .removed_table_ids
940 .into_iter()
941 .map(|sst_id| sst_id.inner())
942 .collect(),
943 inserted_table_infos: intra_level_delta
944 .inserted_table_infos
945 .into_iter()
946 .map(Into::into)
947 .collect_vec(),
948 vnode_partition_count: intra_level_delta.vnode_partition_count,
949 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
950 }
951 }
952}
953
954impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
955where
956 PbSstableInfo: for<'a> From<&'a T>,
957{
958 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
959 Self {
960 level_idx: intra_level_delta.level_idx,
961 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
962 removed_table_ids: intra_level_delta
963 .removed_table_ids
964 .iter()
965 .map(|sst_id| sst_id.inner())
966 .collect(),
967 inserted_table_infos: intra_level_delta
968 .inserted_table_infos
969 .iter()
970 .map(Into::into)
971 .collect_vec(),
972 vnode_partition_count: intra_level_delta.vnode_partition_count,
973 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
974 }
975 }
976}
977
978impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
979where
980 T: for<'a> From<&'a PbSstableInfo>,
981{
982 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
983 Self {
984 level_idx: pb_intra_level_delta.level_idx,
985 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
986 removed_table_ids: HashSet::from_iter(
987 pb_intra_level_delta
988 .removed_table_ids
989 .iter()
990 .map(|sst_id| (*sst_id).into()),
991 ),
992 inserted_table_infos: pb_intra_level_delta
993 .inserted_table_infos
994 .iter()
995 .map(Into::into)
996 .collect_vec(),
997 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
998 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
999 }
1000 }
1001}
1002
1003impl IntraLevelDelta {
1004 pub fn new(
1005 level_idx: u32,
1006 l0_sub_level_id: u64,
1007 removed_table_ids: HashSet<HummockSstableId>,
1008 inserted_table_infos: Vec<SstableInfo>,
1009 vnode_partition_count: u32,
1010 compaction_group_version_id: u64,
1011 ) -> Self {
1012 Self {
1013 level_idx,
1014 l0_sub_level_id,
1015 removed_table_ids,
1016 inserted_table_infos,
1017 vnode_partition_count,
1018 compaction_group_version_id,
1019 }
1020 }
1021}
1022
1023#[derive(Debug, PartialEq, Clone)]
1024pub enum GroupDeltaCommon<T> {
1025 NewL0SubLevel(Vec<T>),
1026 IntraLevel(IntraLevelDeltaCommon<T>),
1027 GroupConstruct(Box<PbGroupConstruct>),
1028 GroupDestroy(PbGroupDestroy),
1029 GroupMerge(PbGroupMerge),
1030 TruncateTables(Vec<u32>),
1031}
1032
1033pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1034
1035impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1036where
1037 T: From<PbSstableInfo>,
1038{
1039 fn from(pb_group_delta: PbGroupDelta) -> Self {
1040 match pb_group_delta.delta_type {
1041 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1042 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1043 }
1044 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1045 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1046 }
1047 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1048 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1049 }
1050 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1051 GroupDeltaCommon::GroupMerge(pb_group_merge)
1052 }
1053 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1054 pb_new_sub_level
1055 .inserted_table_infos
1056 .into_iter()
1057 .map(T::from)
1058 .collect(),
1059 ),
1060 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1061 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids)
1062 }
1063
1064 None => panic!("delta_type is not set"),
1065 }
1066 }
1067}
1068
1069impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1070where
1071 PbSstableInfo: From<T>,
1072{
1073 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1074 match group_delta {
1075 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1076 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1077 },
1078 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1079 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1080 },
1081 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1082 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1083 },
1084 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1085 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1086 },
1087 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1088 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1089 inserted_table_infos: new_sub_level
1090 .into_iter()
1091 .map(PbSstableInfo::from)
1092 .collect(),
1093 })),
1094 },
1095 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1096 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables { table_ids })),
1097 },
1098 }
1099 }
1100}
1101
1102impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1103where
1104 PbSstableInfo: for<'a> From<&'a T>,
1105{
1106 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1107 match group_delta {
1108 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1109 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1110 },
1111 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1112 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1113 },
1114 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1115 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1116 },
1117 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1118 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1119 },
1120 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1121 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1122 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1123 })),
1124 },
1125 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1126 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1127 table_ids: table_ids.clone(),
1128 })),
1129 },
1130 }
1131 }
1132}
1133
1134impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1135where
1136 T: for<'a> From<&'a PbSstableInfo>,
1137{
1138 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1139 match &pb_group_delta.delta_type {
1140 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1141 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1142 }
1143 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1144 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1145 }
1146 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1147 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1148 }
1149 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1150 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1151 }
1152 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1153 pb_new_sub_level
1154 .inserted_table_infos
1155 .iter()
1156 .map(T::from)
1157 .collect(),
1158 ),
1159 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1160 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.clone())
1161 }
1162 None => panic!("delta_type is not set"),
1163 }
1164 }
1165}
1166
1167#[derive(Debug, PartialEq, Clone)]
1168pub struct GroupDeltasCommon<T> {
1169 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1170}
1171
1172impl<T> Default for GroupDeltasCommon<T> {
1173 fn default() -> Self {
1174 Self {
1175 group_deltas: vec![],
1176 }
1177 }
1178}
1179
1180pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1181
1182impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1183where
1184 T: From<PbSstableInfo>,
1185{
1186 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1187 Self {
1188 group_deltas: pb_group_deltas
1189 .group_deltas
1190 .into_iter()
1191 .map(GroupDeltaCommon::from)
1192 .collect_vec(),
1193 }
1194 }
1195}
1196
1197impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1198where
1199 PbSstableInfo: From<T>,
1200{
1201 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1202 Self {
1203 group_deltas: group_deltas
1204 .group_deltas
1205 .into_iter()
1206 .map(|group_delta| group_delta.into())
1207 .collect_vec(),
1208 }
1209 }
1210}
1211
1212impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1213where
1214 PbSstableInfo: for<'a> From<&'a T>,
1215{
1216 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1217 Self {
1218 group_deltas: group_deltas
1219 .group_deltas
1220 .iter()
1221 .map(|group_delta| group_delta.into())
1222 .collect_vec(),
1223 }
1224 }
1225}
1226
1227impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1228where
1229 T: for<'a> From<&'a PbSstableInfo>,
1230{
1231 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1232 Self {
1233 group_deltas: pb_group_deltas
1234 .group_deltas
1235 .iter()
1236 .map(GroupDeltaCommon::from)
1237 .collect_vec(),
1238 }
1239 }
1240}
1241
1242impl<T> GroupDeltasCommon<T>
1243where
1244 PbSstableInfo: for<'a> From<&'a T>,
1245{
1246 pub fn to_protobuf(&self) -> PbGroupDeltas {
1247 self.into()
1248 }
1249}
1250
1251impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1252 #[expect(deprecated)]
1253 fn from(delta: HummockVersionDelta) -> Self {
1254 Self {
1255 id: delta.id,
1256 prev_id: delta.prev_id,
1257 group_deltas: delta.group_deltas,
1258 max_committed_epoch: delta.max_committed_epoch,
1259 trivial_move: delta.trivial_move,
1260 new_table_watermarks: delta.new_table_watermarks,
1261 removed_table_ids: delta.removed_table_ids,
1262 change_log_delta: delta
1263 .change_log_delta
1264 .into_iter()
1265 .map(|(k, v)| {
1266 (
1267 k,
1268 ChangeLogDeltaCommon {
1269 truncate_epoch: v.truncate_epoch,
1270 new_log: EpochNewChangeLogCommon {
1271 new_value: Vec::new(),
1272 old_value: Vec::new(),
1273 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1274 checkpoint_epoch: v.new_log.checkpoint_epoch,
1275 },
1276 },
1277 )
1278 })
1279 .collect(),
1280 state_table_info_delta: delta.state_table_info_delta,
1281 vector_index_delta: delta.vector_index_delta,
1282 }
1283 }
1284}
1285
1286impl From<HummockVersion> for LocalHummockVersion {
1287 #[expect(deprecated)]
1288 fn from(version: HummockVersion) -> Self {
1289 Self {
1290 id: version.id,
1291 levels: version.levels,
1292 max_committed_epoch: version.max_committed_epoch,
1293 table_watermarks: version.table_watermarks,
1294 table_change_log: version
1295 .table_change_log
1296 .into_iter()
1297 .map(|(k, v)| {
1298 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1299 .change_log_into_iter()
1300 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1301 new_value: Vec::new(),
1302 old_value: Vec::new(),
1303 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1304 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1305 })
1306 .collect();
1307 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1308 })
1309 .collect(),
1310 state_table_info: version.state_table_info,
1311 vector_indexes: version.vector_indexes,
1312 }
1313 }
1314}