1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (TableId::new(*table_id), *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
94 self.state_table_info
95 .iter()
96 .map(|(table_id, info)| (table_id.table_id, *info))
97 .collect()
98 }
99
100 pub fn apply_delta(
101 &mut self,
102 delta: &HashMap<TableId, StateTableInfoDelta>,
103 removed_table_id: &HashSet<TableId>,
104 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
105 let mut changed_table = HashMap::new();
106 let mut has_bumped_committed_epoch = false;
107 fn remove_table_from_compaction_group(
108 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
109 compaction_group_id: CompactionGroupId,
110 table_id: TableId,
111 ) {
112 let member_tables = compaction_group_member_tables
113 .get_mut(&compaction_group_id)
114 .expect("should exist");
115 assert!(member_tables.remove(&table_id));
116 if member_tables.is_empty() {
117 assert!(
118 compaction_group_member_tables
119 .remove(&compaction_group_id)
120 .is_some()
121 );
122 }
123 }
124 for table_id in removed_table_id {
125 if let Some(prev_info) = self.state_table_info.remove(table_id) {
126 remove_table_from_compaction_group(
127 &mut self.compaction_group_member_tables,
128 prev_info.compaction_group_id,
129 *table_id,
130 );
131 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
132 } else {
133 warn!(
134 table_id = table_id.table_id,
135 "table to remove does not exist"
136 );
137 }
138 }
139 for (table_id, delta) in delta {
140 if removed_table_id.contains(table_id) {
141 continue;
142 }
143 let new_info = StateTableInfo {
144 committed_epoch: delta.committed_epoch,
145 compaction_group_id: delta.compaction_group_id,
146 };
147 match self.state_table_info.entry(*table_id) {
148 Entry::Occupied(mut entry) => {
149 let prev_info = entry.get_mut();
150 assert!(
151 new_info.committed_epoch >= prev_info.committed_epoch,
152 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
153 table_id.table_id,
154 prev_info,
155 new_info
156 );
157 if new_info.committed_epoch > prev_info.committed_epoch {
158 has_bumped_committed_epoch = true;
159 }
160 if prev_info.compaction_group_id != new_info.compaction_group_id {
161 remove_table_from_compaction_group(
163 &mut self.compaction_group_member_tables,
164 prev_info.compaction_group_id,
165 *table_id,
166 );
167 assert!(
168 self.compaction_group_member_tables
169 .entry(new_info.compaction_group_id)
170 .or_default()
171 .insert(*table_id)
172 );
173 }
174 let prev_info = replace(prev_info, new_info);
175 changed_table.insert(*table_id, Some(prev_info));
176 }
177 Entry::Vacant(entry) => {
178 assert!(
179 self.compaction_group_member_tables
180 .entry(new_info.compaction_group_id)
181 .or_default()
182 .insert(*table_id)
183 );
184 has_bumped_committed_epoch = true;
185 entry.insert(new_info);
186 changed_table.insert(*table_id, None);
187 }
188 }
189 }
190 debug_assert_eq!(
191 self.compaction_group_member_tables,
192 Self::build_compaction_group_member_tables(&self.state_table_info)
193 );
194 (changed_table, has_bumped_committed_epoch)
195 }
196
197 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
198 &self.state_table_info
199 }
200
201 pub fn compaction_group_member_table_ids(
202 &self,
203 compaction_group_id: CompactionGroupId,
204 ) -> &BTreeSet<TableId> {
205 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
206 self.compaction_group_member_tables
207 .get(&compaction_group_id)
208 .unwrap_or_else(|| EMPTY_SET.deref())
209 }
210
211 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
212 &self.compaction_group_member_tables
213 }
214
215 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
216 self.state_table_info
217 .values()
218 .map(|info| info.committed_epoch)
219 .max()
220 }
221}
222
223#[derive(Debug, Clone, PartialEq)]
224pub struct HummockVersionCommon<T, L = T> {
225 pub id: HummockVersionId,
226 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
227 #[deprecated]
228 pub(crate) max_committed_epoch: u64,
229 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
230 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
231 pub state_table_info: HummockVersionStateTableInfo,
232 pub vector_indexes: HashMap<TableId, VectorIndex>,
233}
234
235pub type HummockVersion = HummockVersionCommon<SstableInfo>;
236
237pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
238
239impl Default for HummockVersion {
240 fn default() -> Self {
241 HummockVersion::from(&PbHummockVersion::default())
242 }
243}
244
245impl<T> HummockVersionCommon<T>
246where
247 T: for<'a> From<&'a PbSstableInfo>,
248 PbSstableInfo: for<'a> From<&'a T>,
249{
250 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
253 pb_version.into()
254 }
255
256 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
259 pb_version.into()
260 }
261
262 pub fn to_protobuf(&self) -> PbHummockVersion {
263 self.into()
264 }
265}
266
267impl HummockVersion {
268 pub fn estimated_encode_len(&self) -> usize {
269 self.levels.len() * size_of::<CompactionGroupId>()
270 + self
271 .levels
272 .values()
273 .map(|level| level.estimated_encode_len())
274 .sum::<usize>()
275 + self.table_watermarks.len() * size_of::<u32>()
276 + self
277 .table_watermarks
278 .values()
279 .map(|table_watermark| table_watermark.estimated_encode_len())
280 .sum::<usize>()
281 + self
282 .table_change_log
283 .values()
284 .map(|c| {
285 c.iter()
286 .map(|l| {
287 l.old_value
288 .iter()
289 .chain(l.new_value.iter())
290 .map(|s| s.estimated_encode_len())
291 .sum::<usize>()
292 })
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 }
297}
298
299impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
300where
301 T: for<'a> From<&'a PbSstableInfo>,
302{
303 fn from(pb_version: &PbHummockVersion) -> Self {
304 #[expect(deprecated)]
305 Self {
306 id: HummockVersionId(pb_version.id),
307 levels: pb_version
308 .levels
309 .iter()
310 .map(|(group_id, levels)| {
311 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
312 })
313 .collect(),
314 max_committed_epoch: pb_version.max_committed_epoch,
315 table_watermarks: pb_version
316 .table_watermarks
317 .iter()
318 .map(|(table_id, table_watermark)| {
319 (
320 TableId::new(*table_id),
321 Arc::new(TableWatermarks::from(table_watermark)),
322 )
323 })
324 .collect(),
325 table_change_log: pb_version
326 .table_change_logs
327 .iter()
328 .map(|(table_id, change_log)| {
329 (
330 TableId::new(*table_id),
331 TableChangeLogCommon::from_protobuf(change_log),
332 )
333 })
334 .collect(),
335 state_table_info: HummockVersionStateTableInfo::from_protobuf(
336 &pb_version.state_table_info,
337 ),
338 vector_indexes: pb_version
339 .vector_indexes
340 .iter()
341 .map(|(table_id, index)| (table_id.into(), index.clone().into()))
342 .collect(),
343 }
344 }
345}
346
347impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
348where
349 PbSstableInfo: for<'a> From<&'a T>,
350{
351 fn from(version: &HummockVersionCommon<T>) -> Self {
352 #[expect(deprecated)]
353 Self {
354 id: version.id.0,
355 levels: version
356 .levels
357 .iter()
358 .map(|(group_id, levels)| (*group_id as _, levels.into()))
359 .collect(),
360 max_committed_epoch: version.max_committed_epoch,
361 table_watermarks: version
362 .table_watermarks
363 .iter()
364 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
365 .collect(),
366 table_change_logs: version
367 .table_change_log
368 .iter()
369 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
370 .collect(),
371 state_table_info: version.state_table_info.to_protobuf(),
372 vector_indexes: version
373 .vector_indexes
374 .iter()
375 .map(|(table_id, index)| (table_id.table_id, index.clone().into()))
376 .collect(),
377 }
378 }
379}
380
381impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
382where
383 PbSstableInfo: From<T>,
384 PbSstableInfo: for<'a> From<&'a T>,
385{
386 fn from(version: HummockVersionCommon<T>) -> Self {
387 #[expect(deprecated)]
388 Self {
389 id: version.id.0,
390 levels: version
391 .levels
392 .into_iter()
393 .map(|(group_id, levels)| (group_id as _, levels.into()))
394 .collect(),
395 max_committed_epoch: version.max_committed_epoch,
396 table_watermarks: version
397 .table_watermarks
398 .into_iter()
399 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
400 .collect(),
401 table_change_logs: version
402 .table_change_log
403 .into_iter()
404 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
405 .collect(),
406 state_table_info: version.state_table_info.to_protobuf(),
407 vector_indexes: version
408 .vector_indexes
409 .into_iter()
410 .map(|(table_id, index)| (table_id.table_id, index.into()))
411 .collect(),
412 }
413 }
414}
415
416impl HummockVersion {
417 pub fn next_version_id(&self) -> HummockVersionId {
418 self.id.next()
419 }
420
421 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
422 self.state_table_info.state_table_info.is_empty()
424 && self.levels.values().any(|group| {
425 #[expect(deprecated)]
427 !group.member_table_ids.is_empty()
428 })
429 }
430
431 pub fn may_fill_backward_compatible_state_table_info_delta(
432 &self,
433 delta: &mut HummockVersionDelta,
434 ) {
435 #[expect(deprecated)]
436 for (cg_id, group) in &self.levels {
438 for table_id in &group.member_table_ids {
439 assert!(
440 delta
441 .state_table_info_delta
442 .insert(
443 TableId::new(*table_id),
444 StateTableInfoDelta {
445 committed_epoch: self.max_committed_epoch,
446 compaction_group_id: *cg_id,
447 }
448 )
449 .is_none(),
450 "duplicate table id {} in cg {}",
451 table_id,
452 cg_id
453 );
454 }
455 }
456 }
457
458 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
459 #[expect(deprecated)]
460 let mut init_version = HummockVersion {
461 id: FIRST_VERSION_ID,
462 levels: Default::default(),
463 max_committed_epoch: INVALID_EPOCH,
464 table_watermarks: HashMap::new(),
465 table_change_log: HashMap::new(),
466 state_table_info: HummockVersionStateTableInfo::empty(),
467 vector_indexes: Default::default(),
468 };
469 for group_id in [
470 StaticCompactionGroupId::StateDefault as CompactionGroupId,
471 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
472 ] {
473 init_version.levels.insert(
474 group_id,
475 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
476 );
477 }
478 init_version
479 }
480
481 pub fn version_delta_after(&self) -> HummockVersionDelta {
482 #[expect(deprecated)]
483 HummockVersionDelta {
484 id: self.next_version_id(),
485 prev_id: self.id,
486 trivial_move: false,
487 max_committed_epoch: self.max_committed_epoch,
488 group_deltas: Default::default(),
489 new_table_watermarks: HashMap::new(),
490 removed_table_ids: HashSet::new(),
491 change_log_delta: HashMap::new(),
492 state_table_info_delta: Default::default(),
493 vector_index_delta: Default::default(),
494 }
495 }
496
497 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
498 let table_change_log = {
499 let mut table_change_log = HashMap::new();
500 for (table_id, log) in &mut self.table_change_log {
501 let change_log_iter =
502 log.change_log_iter_mut()
503 .map(|item| EpochNewChangeLogCommon {
504 new_value: std::mem::take(&mut item.new_value),
505 old_value: std::mem::take(&mut item.old_value),
506 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
507 checkpoint_epoch: item.checkpoint_epoch,
508 });
509 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
510 }
511
512 table_change_log
513 };
514
515 let local_version = LocalHummockVersion::from(self);
516
517 (local_version, table_change_log)
518 }
519}
520
521impl<T, L> HummockVersionCommon<T, L> {
522 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
523 self.state_table_info
524 .info()
525 .get(&table_id)
526 .map(|info| info.committed_epoch)
527 }
528}
529
530#[derive(Debug, PartialEq, Clone)]
531pub struct HummockVersionDeltaCommon<T, L = T> {
532 pub id: HummockVersionId,
533 pub prev_id: HummockVersionId,
534 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
535 #[deprecated]
536 pub(crate) max_committed_epoch: u64,
537 pub trivial_move: bool,
538 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
539 pub removed_table_ids: HashSet<TableId>,
540 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
541 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
542 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
543}
544
545pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
546
547pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
548
549impl Default for HummockVersionDelta {
550 fn default() -> Self {
551 HummockVersionDelta::from(&PbHummockVersionDelta::default())
552 }
553}
554
555impl<T> HummockVersionDeltaCommon<T>
556where
557 T: for<'a> From<&'a PbSstableInfo>,
558 PbSstableInfo: for<'a> From<&'a T>,
559{
560 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
563 delta.into()
564 }
565
566 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
569 delta.into()
570 }
571
572 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
573 self.into()
574 }
575}
576
577pub trait SstableIdReader {
578 fn sst_id(&self) -> HummockSstableId;
579}
580
581pub trait ObjectIdReader {
582 fn object_id(&self) -> HummockSstableObjectId;
583}
584
585impl<T> HummockVersionDeltaCommon<T>
586where
587 T: SstableIdReader + ObjectIdReader,
588{
589 pub fn newly_added_object_ids(
594 &self,
595 exclude_table_change_log: bool,
596 ) -> HashSet<HummockObjectId> {
597 match HummockObjectId::Sstable(0.into()) {
601 HummockObjectId::Sstable(_) => {}
602 HummockObjectId::VectorFile(_) => {}
603 };
604 self.newly_added_sst_infos(exclude_table_change_log)
605 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
606 .chain(
607 self.vector_index_delta
608 .values()
609 .flat_map(|vector_index_delta| {
610 vector_index_delta
611 .newly_added_objects()
612 .map(|(object_id, _)| object_id)
613 }),
614 )
615 .collect()
616 }
617
618 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
619 self.newly_added_sst_infos(exclude_table_change_log)
620 .map(|sst| sst.sst_id())
621 .collect()
622 }
623}
624
625impl<T> HummockVersionDeltaCommon<T> {
626 pub fn newly_added_sst_infos(
627 &self,
628 exclude_table_change_log: bool,
629 ) -> impl Iterator<Item = &'_ T> {
630 let may_table_change_delta = if exclude_table_change_log {
631 None
632 } else {
633 Some(self.change_log_delta.values())
634 };
635 self.group_deltas
636 .values()
637 .flat_map(|group_deltas| {
638 group_deltas.group_deltas.iter().flat_map(|group_delta| {
639 let sst_slice = match &group_delta {
640 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
641 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
642 inserted_table_infos,
643 ..
644 }) => Some(inserted_table_infos.iter()),
645 GroupDeltaCommon::GroupConstruct(_)
646 | GroupDeltaCommon::GroupDestroy(_)
647 | GroupDeltaCommon::GroupMerge(_) => None,
648 };
649 sst_slice.into_iter().flatten()
650 })
651 })
652 .chain(
653 may_table_change_delta
654 .map(|v| {
655 v.flat_map(|delta| {
656 let new_log = &delta.new_log;
657 new_log.new_value.iter().chain(new_log.old_value.iter())
658 })
659 })
660 .into_iter()
661 .flatten(),
662 )
663 }
664}
665
666impl HummockVersionDelta {
667 #[expect(deprecated)]
668 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
669 self.max_committed_epoch
670 }
671}
672
673impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
674where
675 T: for<'a> From<&'a PbSstableInfo>,
676{
677 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
678 #[expect(deprecated)]
679 Self {
680 id: HummockVersionId(pb_version_delta.id),
681 prev_id: HummockVersionId(pb_version_delta.prev_id),
682 group_deltas: pb_version_delta
683 .group_deltas
684 .iter()
685 .map(|(group_id, deltas)| {
686 (
687 *group_id as CompactionGroupId,
688 GroupDeltasCommon::from(deltas),
689 )
690 })
691 .collect(),
692 max_committed_epoch: pb_version_delta.max_committed_epoch,
693 trivial_move: pb_version_delta.trivial_move,
694 new_table_watermarks: pb_version_delta
695 .new_table_watermarks
696 .iter()
697 .map(|(table_id, watermarks)| {
698 (TableId::new(*table_id), TableWatermarks::from(watermarks))
699 })
700 .collect(),
701 removed_table_ids: pb_version_delta
702 .removed_table_ids
703 .iter()
704 .map(|table_id| TableId::new(*table_id))
705 .collect(),
706 change_log_delta: pb_version_delta
707 .change_log_delta
708 .iter()
709 .map(|(table_id, log_delta)| {
710 (
711 TableId::new(*table_id),
712 ChangeLogDeltaCommon {
713 truncate_epoch: log_delta.truncate_epoch,
714 new_log: log_delta.new_log.as_ref().unwrap().into(),
715 },
716 )
717 })
718 .collect(),
719
720 state_table_info_delta: pb_version_delta
721 .state_table_info_delta
722 .iter()
723 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
724 .collect(),
725 vector_index_delta: pb_version_delta
726 .vector_index_delta
727 .iter()
728 .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone().into()))
729 .collect(),
730 }
731 }
732}
733
734impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
735where
736 PbSstableInfo: for<'a> From<&'a T>,
737{
738 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
739 #[expect(deprecated)]
740 Self {
741 id: version_delta.id.0,
742 prev_id: version_delta.prev_id.0,
743 group_deltas: version_delta
744 .group_deltas
745 .iter()
746 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
747 .collect(),
748 max_committed_epoch: version_delta.max_committed_epoch,
749 trivial_move: version_delta.trivial_move,
750 new_table_watermarks: version_delta
751 .new_table_watermarks
752 .iter()
753 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
754 .collect(),
755 removed_table_ids: version_delta
756 .removed_table_ids
757 .iter()
758 .map(|table_id| table_id.table_id)
759 .collect(),
760 change_log_delta: version_delta
761 .change_log_delta
762 .iter()
763 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
764 .collect(),
765 state_table_info_delta: version_delta
766 .state_table_info_delta
767 .iter()
768 .map(|(table_id, delta)| (table_id.table_id, *delta))
769 .collect(),
770 vector_index_delta: version_delta
771 .vector_index_delta
772 .iter()
773 .map(|(table_id, delta)| (table_id.table_id, delta.clone().into()))
774 .collect(),
775 }
776 }
777}
778
779impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
780where
781 PbSstableInfo: From<T>,
782{
783 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
784 #[expect(deprecated)]
785 Self {
786 id: version_delta.id.0,
787 prev_id: version_delta.prev_id.0,
788 group_deltas: version_delta
789 .group_deltas
790 .into_iter()
791 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
792 .collect(),
793 max_committed_epoch: version_delta.max_committed_epoch,
794 trivial_move: version_delta.trivial_move,
795 new_table_watermarks: version_delta
796 .new_table_watermarks
797 .into_iter()
798 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
799 .collect(),
800 removed_table_ids: version_delta
801 .removed_table_ids
802 .into_iter()
803 .map(|table_id| table_id.table_id)
804 .collect(),
805 change_log_delta: version_delta
806 .change_log_delta
807 .into_iter()
808 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
809 .collect(),
810 state_table_info_delta: version_delta
811 .state_table_info_delta
812 .into_iter()
813 .map(|(table_id, delta)| (table_id.table_id, delta))
814 .collect(),
815 vector_index_delta: version_delta
816 .vector_index_delta
817 .into_iter()
818 .map(|(table_id, delta)| (table_id.table_id, delta.into()))
819 .collect(),
820 }
821 }
822}
823
824impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
825where
826 T: From<PbSstableInfo>,
827{
828 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
829 #[expect(deprecated)]
830 Self {
831 id: HummockVersionId(pb_version_delta.id),
832 prev_id: HummockVersionId(pb_version_delta.prev_id),
833 group_deltas: pb_version_delta
834 .group_deltas
835 .into_iter()
836 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
837 .collect(),
838 max_committed_epoch: pb_version_delta.max_committed_epoch,
839 trivial_move: pb_version_delta.trivial_move,
840 new_table_watermarks: pb_version_delta
841 .new_table_watermarks
842 .into_iter()
843 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
844 .collect(),
845 removed_table_ids: pb_version_delta
846 .removed_table_ids
847 .into_iter()
848 .map(TableId::new)
849 .collect(),
850 change_log_delta: pb_version_delta
851 .change_log_delta
852 .iter()
853 .map(|(table_id, log_delta)| {
854 (
855 TableId::new(*table_id),
856 ChangeLogDeltaCommon {
857 new_log: log_delta.new_log.clone().unwrap().into(),
858 truncate_epoch: log_delta.truncate_epoch,
859 },
860 )
861 })
862 .collect(),
863 state_table_info_delta: pb_version_delta
864 .state_table_info_delta
865 .iter()
866 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
867 .collect(),
868 vector_index_delta: pb_version_delta
869 .vector_index_delta
870 .into_iter()
871 .map(|(table_id, delta)| (TableId::new(table_id), delta.into()))
872 .collect(),
873 }
874 }
875}
876
877#[derive(Debug, PartialEq, Clone)]
878pub struct IntraLevelDeltaCommon<T> {
879 pub level_idx: u32,
880 pub l0_sub_level_id: u64,
881 pub removed_table_ids: HashSet<HummockSstableId>,
882 pub inserted_table_infos: Vec<T>,
883 pub vnode_partition_count: u32,
884 pub compaction_group_version_id: u64,
885}
886
887pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
888
889impl IntraLevelDelta {
890 pub fn estimated_encode_len(&self) -> usize {
891 size_of::<u32>()
892 + size_of::<u64>()
893 + self.removed_table_ids.len() * size_of::<u32>()
894 + self
895 .inserted_table_infos
896 .iter()
897 .map(|sst| sst.estimated_encode_len())
898 .sum::<usize>()
899 + size_of::<u32>()
900 }
901}
902
903impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
904where
905 T: From<PbSstableInfo>,
906{
907 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
908 Self {
909 level_idx: pb_intra_level_delta.level_idx,
910 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
911 removed_table_ids: HashSet::from_iter(
912 pb_intra_level_delta
913 .removed_table_ids
914 .iter()
915 .map(|sst_id| (*sst_id).into()),
916 ),
917 inserted_table_infos: pb_intra_level_delta
918 .inserted_table_infos
919 .into_iter()
920 .map(Into::into)
921 .collect_vec(),
922 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
923 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
924 }
925 }
926}
927
928impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
929where
930 PbSstableInfo: From<T>,
931{
932 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
933 Self {
934 level_idx: intra_level_delta.level_idx,
935 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
936 removed_table_ids: intra_level_delta
937 .removed_table_ids
938 .into_iter()
939 .map(|sst_id| sst_id.inner())
940 .collect(),
941 inserted_table_infos: intra_level_delta
942 .inserted_table_infos
943 .into_iter()
944 .map(Into::into)
945 .collect_vec(),
946 vnode_partition_count: intra_level_delta.vnode_partition_count,
947 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
948 }
949 }
950}
951
952impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
953where
954 PbSstableInfo: for<'a> From<&'a T>,
955{
956 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
957 Self {
958 level_idx: intra_level_delta.level_idx,
959 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
960 removed_table_ids: intra_level_delta
961 .removed_table_ids
962 .iter()
963 .map(|sst_id| sst_id.inner())
964 .collect(),
965 inserted_table_infos: intra_level_delta
966 .inserted_table_infos
967 .iter()
968 .map(Into::into)
969 .collect_vec(),
970 vnode_partition_count: intra_level_delta.vnode_partition_count,
971 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
972 }
973 }
974}
975
976impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
977where
978 T: for<'a> From<&'a PbSstableInfo>,
979{
980 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
981 Self {
982 level_idx: pb_intra_level_delta.level_idx,
983 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
984 removed_table_ids: HashSet::from_iter(
985 pb_intra_level_delta
986 .removed_table_ids
987 .iter()
988 .map(|sst_id| (*sst_id).into()),
989 ),
990 inserted_table_infos: pb_intra_level_delta
991 .inserted_table_infos
992 .iter()
993 .map(Into::into)
994 .collect_vec(),
995 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
996 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
997 }
998 }
999}
1000
1001impl IntraLevelDelta {
1002 pub fn new(
1003 level_idx: u32,
1004 l0_sub_level_id: u64,
1005 removed_table_ids: HashSet<HummockSstableId>,
1006 inserted_table_infos: Vec<SstableInfo>,
1007 vnode_partition_count: u32,
1008 compaction_group_version_id: u64,
1009 ) -> Self {
1010 Self {
1011 level_idx,
1012 l0_sub_level_id,
1013 removed_table_ids,
1014 inserted_table_infos,
1015 vnode_partition_count,
1016 compaction_group_version_id,
1017 }
1018 }
1019}
1020
1021#[derive(Debug, PartialEq, Clone)]
1022pub enum GroupDeltaCommon<T> {
1023 NewL0SubLevel(Vec<T>),
1024 IntraLevel(IntraLevelDeltaCommon<T>),
1025 GroupConstruct(Box<PbGroupConstruct>),
1026 GroupDestroy(PbGroupDestroy),
1027 GroupMerge(PbGroupMerge),
1028}
1029
1030pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1031
1032impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1033where
1034 T: From<PbSstableInfo>,
1035{
1036 fn from(pb_group_delta: PbGroupDelta) -> Self {
1037 match pb_group_delta.delta_type {
1038 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1039 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1040 }
1041 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1042 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1043 }
1044 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1045 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1046 }
1047 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1048 GroupDeltaCommon::GroupMerge(pb_group_merge)
1049 }
1050 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1051 pb_new_sub_level
1052 .inserted_table_infos
1053 .into_iter()
1054 .map(T::from)
1055 .collect(),
1056 ),
1057 None => panic!("delta_type is not set"),
1058 }
1059 }
1060}
1061
1062impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1063where
1064 PbSstableInfo: From<T>,
1065{
1066 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1067 match group_delta {
1068 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1069 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1070 },
1071 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1072 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1073 },
1074 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1075 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1076 },
1077 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1078 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1079 },
1080 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1081 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1082 inserted_table_infos: new_sub_level
1083 .into_iter()
1084 .map(PbSstableInfo::from)
1085 .collect(),
1086 })),
1087 },
1088 }
1089 }
1090}
1091
1092impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1093where
1094 PbSstableInfo: for<'a> From<&'a T>,
1095{
1096 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1097 match group_delta {
1098 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1099 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1100 },
1101 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1102 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1103 },
1104 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1105 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1106 },
1107 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1108 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1109 },
1110 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1111 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1112 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1113 })),
1114 },
1115 }
1116 }
1117}
1118
1119impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1120where
1121 T: for<'a> From<&'a PbSstableInfo>,
1122{
1123 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1124 match &pb_group_delta.delta_type {
1125 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1126 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1127 }
1128 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1129 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1130 }
1131 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1132 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1133 }
1134 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1135 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1136 }
1137 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1138 pb_new_sub_level
1139 .inserted_table_infos
1140 .iter()
1141 .map(T::from)
1142 .collect(),
1143 ),
1144 None => panic!("delta_type is not set"),
1145 }
1146 }
1147}
1148
1149#[derive(Debug, PartialEq, Clone)]
1150pub struct GroupDeltasCommon<T> {
1151 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1152}
1153
1154impl<T> Default for GroupDeltasCommon<T> {
1155 fn default() -> Self {
1156 Self {
1157 group_deltas: vec![],
1158 }
1159 }
1160}
1161
1162pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1163
1164impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1165where
1166 T: From<PbSstableInfo>,
1167{
1168 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1169 Self {
1170 group_deltas: pb_group_deltas
1171 .group_deltas
1172 .into_iter()
1173 .map(GroupDeltaCommon::from)
1174 .collect_vec(),
1175 }
1176 }
1177}
1178
1179impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1180where
1181 PbSstableInfo: From<T>,
1182{
1183 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1184 Self {
1185 group_deltas: group_deltas
1186 .group_deltas
1187 .into_iter()
1188 .map(|group_delta| group_delta.into())
1189 .collect_vec(),
1190 }
1191 }
1192}
1193
1194impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1195where
1196 PbSstableInfo: for<'a> From<&'a T>,
1197{
1198 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1199 Self {
1200 group_deltas: group_deltas
1201 .group_deltas
1202 .iter()
1203 .map(|group_delta| group_delta.into())
1204 .collect_vec(),
1205 }
1206 }
1207}
1208
1209impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1210where
1211 T: for<'a> From<&'a PbSstableInfo>,
1212{
1213 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1214 Self {
1215 group_deltas: pb_group_deltas
1216 .group_deltas
1217 .iter()
1218 .map(GroupDeltaCommon::from)
1219 .collect_vec(),
1220 }
1221 }
1222}
1223
1224impl<T> GroupDeltasCommon<T>
1225where
1226 PbSstableInfo: for<'a> From<&'a T>,
1227{
1228 pub fn to_protobuf(&self) -> PbGroupDeltas {
1229 self.into()
1230 }
1231}
1232
1233impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1234 #[expect(deprecated)]
1235 fn from(delta: HummockVersionDelta) -> Self {
1236 Self {
1237 id: delta.id,
1238 prev_id: delta.prev_id,
1239 group_deltas: delta.group_deltas,
1240 max_committed_epoch: delta.max_committed_epoch,
1241 trivial_move: delta.trivial_move,
1242 new_table_watermarks: delta.new_table_watermarks,
1243 removed_table_ids: delta.removed_table_ids,
1244 change_log_delta: delta
1245 .change_log_delta
1246 .into_iter()
1247 .map(|(k, v)| {
1248 (
1249 k,
1250 ChangeLogDeltaCommon {
1251 truncate_epoch: v.truncate_epoch,
1252 new_log: EpochNewChangeLogCommon {
1253 new_value: Vec::new(),
1254 old_value: Vec::new(),
1255 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1256 checkpoint_epoch: v.new_log.checkpoint_epoch,
1257 },
1258 },
1259 )
1260 })
1261 .collect(),
1262 state_table_info_delta: delta.state_table_info_delta,
1263 vector_index_delta: delta.vector_index_delta,
1264 }
1265 }
1266}
1267
1268impl From<HummockVersion> for LocalHummockVersion {
1269 #[expect(deprecated)]
1270 fn from(version: HummockVersion) -> Self {
1271 Self {
1272 id: version.id,
1273 levels: version.levels,
1274 max_committed_epoch: version.max_committed_epoch,
1275 table_watermarks: version.table_watermarks,
1276 table_change_log: version
1277 .table_change_log
1278 .into_iter()
1279 .map(|(k, v)| {
1280 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1281 .change_log_into_iter()
1282 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1283 new_value: Vec::new(),
1284 old_value: Vec::new(),
1285 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1286 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1287 })
1288 .collect();
1289 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1290 })
1291 .collect(),
1292 state_table_info: version.state_table_info,
1293 vector_indexes: version.vector_indexes,
1294 }
1295 }
1296}