1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (TableId::new(*table_id), *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
94 self.state_table_info
95 .iter()
96 .map(|(table_id, info)| (table_id.as_raw_id(), *info))
97 .collect()
98 }
99
100 pub fn apply_delta(
101 &mut self,
102 delta: &HashMap<TableId, StateTableInfoDelta>,
103 removed_table_id: &HashSet<TableId>,
104 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
105 let mut changed_table = HashMap::new();
106 let mut has_bumped_committed_epoch = false;
107 fn remove_table_from_compaction_group(
108 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
109 compaction_group_id: CompactionGroupId,
110 table_id: TableId,
111 ) {
112 let member_tables = compaction_group_member_tables
113 .get_mut(&compaction_group_id)
114 .expect("should exist");
115 assert!(member_tables.remove(&table_id));
116 if member_tables.is_empty() {
117 assert!(
118 compaction_group_member_tables
119 .remove(&compaction_group_id)
120 .is_some()
121 );
122 }
123 }
124 for table_id in removed_table_id {
125 if let Some(prev_info) = self.state_table_info.remove(table_id) {
126 remove_table_from_compaction_group(
127 &mut self.compaction_group_member_tables,
128 prev_info.compaction_group_id,
129 *table_id,
130 );
131 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
132 } else {
133 warn!(
134 %table_id,
135 "table to remove does not exist"
136 );
137 }
138 }
139 for (table_id, delta) in delta {
140 if removed_table_id.contains(table_id) {
141 continue;
142 }
143 let new_info = StateTableInfo {
144 committed_epoch: delta.committed_epoch,
145 compaction_group_id: delta.compaction_group_id,
146 };
147 match self.state_table_info.entry(*table_id) {
148 Entry::Occupied(mut entry) => {
149 let prev_info = entry.get_mut();
150 assert!(
151 new_info.committed_epoch >= prev_info.committed_epoch,
152 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
153 table_id.as_raw_id(),
154 prev_info,
155 new_info
156 );
157 if new_info.committed_epoch > prev_info.committed_epoch {
158 has_bumped_committed_epoch = true;
159 }
160 if prev_info.compaction_group_id != new_info.compaction_group_id {
161 remove_table_from_compaction_group(
163 &mut self.compaction_group_member_tables,
164 prev_info.compaction_group_id,
165 *table_id,
166 );
167 assert!(
168 self.compaction_group_member_tables
169 .entry(new_info.compaction_group_id)
170 .or_default()
171 .insert(*table_id)
172 );
173 }
174 let prev_info = replace(prev_info, new_info);
175 changed_table.insert(*table_id, Some(prev_info));
176 }
177 Entry::Vacant(entry) => {
178 assert!(
179 self.compaction_group_member_tables
180 .entry(new_info.compaction_group_id)
181 .or_default()
182 .insert(*table_id)
183 );
184 has_bumped_committed_epoch = true;
185 entry.insert(new_info);
186 changed_table.insert(*table_id, None);
187 }
188 }
189 }
190 debug_assert_eq!(
191 self.compaction_group_member_tables,
192 Self::build_compaction_group_member_tables(&self.state_table_info)
193 );
194 (changed_table, has_bumped_committed_epoch)
195 }
196
197 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
198 &self.state_table_info
199 }
200
201 pub fn compaction_group_member_table_ids(
202 &self,
203 compaction_group_id: CompactionGroupId,
204 ) -> &BTreeSet<TableId> {
205 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
206 self.compaction_group_member_tables
207 .get(&compaction_group_id)
208 .unwrap_or_else(|| EMPTY_SET.deref())
209 }
210
211 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
212 &self.compaction_group_member_tables
213 }
214
215 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
216 self.state_table_info
217 .values()
218 .map(|info| info.committed_epoch)
219 .max()
220 }
221}
222
223#[derive(Debug, Clone, PartialEq)]
224pub struct HummockVersionCommon<T, L = T> {
225 pub id: HummockVersionId,
226 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
227 #[deprecated]
228 pub(crate) max_committed_epoch: u64,
229 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
230 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
231 pub state_table_info: HummockVersionStateTableInfo,
232 pub vector_indexes: HashMap<TableId, VectorIndex>,
233}
234
235pub type HummockVersion = HummockVersionCommon<SstableInfo>;
236
237pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
238
239impl Default for HummockVersion {
240 fn default() -> Self {
241 HummockVersion::from(&PbHummockVersion::default())
242 }
243}
244
245impl<T> HummockVersionCommon<T>
246where
247 T: for<'a> From<&'a PbSstableInfo>,
248 PbSstableInfo: for<'a> From<&'a T>,
249{
250 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
253 pb_version.into()
254 }
255
256 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
259 pb_version.into()
260 }
261
262 pub fn to_protobuf(&self) -> PbHummockVersion {
263 self.into()
264 }
265}
266
267impl HummockVersion {
268 pub fn estimated_encode_len(&self) -> usize {
269 self.levels.len() * size_of::<CompactionGroupId>()
270 + self
271 .levels
272 .values()
273 .map(|level| level.estimated_encode_len())
274 .sum::<usize>()
275 + self.table_watermarks.len() * size_of::<u32>()
276 + self
277 .table_watermarks
278 .values()
279 .map(|table_watermark| table_watermark.estimated_encode_len())
280 .sum::<usize>()
281 + self
282 .table_change_log
283 .values()
284 .map(|c| {
285 c.iter()
286 .map(|l| {
287 l.old_value
288 .iter()
289 .chain(l.new_value.iter())
290 .map(|s| s.estimated_encode_len())
291 .sum::<usize>()
292 })
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 }
297}
298
299impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
300where
301 T: for<'a> From<&'a PbSstableInfo>,
302{
303 fn from(pb_version: &PbHummockVersion) -> Self {
304 #[expect(deprecated)]
305 Self {
306 id: HummockVersionId(pb_version.id),
307 levels: pb_version
308 .levels
309 .iter()
310 .map(|(group_id, levels)| {
311 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
312 })
313 .collect(),
314 max_committed_epoch: pb_version.max_committed_epoch,
315 table_watermarks: pb_version
316 .table_watermarks
317 .iter()
318 .map(|(table_id, table_watermark)| {
319 (
320 TableId::new(*table_id),
321 Arc::new(TableWatermarks::from(table_watermark)),
322 )
323 })
324 .collect(),
325 table_change_log: pb_version
326 .table_change_logs
327 .iter()
328 .map(|(table_id, change_log)| {
329 (
330 TableId::new(*table_id),
331 TableChangeLogCommon::from_protobuf(change_log),
332 )
333 })
334 .collect(),
335 state_table_info: HummockVersionStateTableInfo::from_protobuf(
336 &pb_version.state_table_info,
337 ),
338 vector_indexes: pb_version
339 .vector_indexes
340 .iter()
341 .map(|(table_id, index)| (table_id.into(), index.clone().into()))
342 .collect(),
343 }
344 }
345}
346
347impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
348where
349 PbSstableInfo: for<'a> From<&'a T>,
350{
351 fn from(version: &HummockVersionCommon<T>) -> Self {
352 #[expect(deprecated)]
353 Self {
354 id: version.id.0,
355 levels: version
356 .levels
357 .iter()
358 .map(|(group_id, levels)| (*group_id as _, levels.into()))
359 .collect(),
360 max_committed_epoch: version.max_committed_epoch,
361 table_watermarks: version
362 .table_watermarks
363 .iter()
364 .map(|(table_id, watermark)| (table_id.as_raw_id(), watermark.as_ref().into()))
365 .collect(),
366 table_change_logs: version
367 .table_change_log
368 .iter()
369 .map(|(table_id, change_log)| (table_id.as_raw_id(), change_log.to_protobuf()))
370 .collect(),
371 state_table_info: version.state_table_info.to_protobuf(),
372 vector_indexes: version
373 .vector_indexes
374 .iter()
375 .map(|(table_id, index)| (table_id.as_raw_id(), index.clone().into()))
376 .collect(),
377 }
378 }
379}
380
381impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
382where
383 PbSstableInfo: From<T>,
384 PbSstableInfo: for<'a> From<&'a T>,
385{
386 fn from(version: HummockVersionCommon<T>) -> Self {
387 #[expect(deprecated)]
388 Self {
389 id: version.id.0,
390 levels: version
391 .levels
392 .into_iter()
393 .map(|(group_id, levels)| (group_id as _, levels.into()))
394 .collect(),
395 max_committed_epoch: version.max_committed_epoch,
396 table_watermarks: version
397 .table_watermarks
398 .into_iter()
399 .map(|(table_id, watermark)| (table_id.as_raw_id(), watermark.as_ref().into()))
400 .collect(),
401 table_change_logs: version
402 .table_change_log
403 .into_iter()
404 .map(|(table_id, change_log)| (table_id.as_raw_id(), change_log.to_protobuf()))
405 .collect(),
406 state_table_info: version.state_table_info.to_protobuf(),
407 vector_indexes: version
408 .vector_indexes
409 .into_iter()
410 .map(|(table_id, index)| (table_id.as_raw_id(), index.into()))
411 .collect(),
412 }
413 }
414}
415
416impl HummockVersion {
417 pub fn next_version_id(&self) -> HummockVersionId {
418 self.id.next()
419 }
420
421 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
422 self.state_table_info.state_table_info.is_empty()
424 && self.levels.values().any(|group| {
425 #[expect(deprecated)]
427 !group.member_table_ids.is_empty()
428 })
429 }
430
431 pub fn may_fill_backward_compatible_state_table_info_delta(
432 &self,
433 delta: &mut HummockVersionDelta,
434 ) {
435 #[expect(deprecated)]
436 for (cg_id, group) in &self.levels {
438 for table_id in &group.member_table_ids {
439 assert!(
440 delta
441 .state_table_info_delta
442 .insert(
443 TableId::new(*table_id),
444 StateTableInfoDelta {
445 committed_epoch: self.max_committed_epoch,
446 compaction_group_id: *cg_id,
447 }
448 )
449 .is_none(),
450 "duplicate table id {} in cg {}",
451 table_id,
452 cg_id
453 );
454 }
455 }
456 }
457
458 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
459 #[expect(deprecated)]
460 let mut init_version = HummockVersion {
461 id: FIRST_VERSION_ID,
462 levels: Default::default(),
463 max_committed_epoch: INVALID_EPOCH,
464 table_watermarks: HashMap::new(),
465 table_change_log: HashMap::new(),
466 state_table_info: HummockVersionStateTableInfo::empty(),
467 vector_indexes: Default::default(),
468 };
469 for group_id in [
470 StaticCompactionGroupId::StateDefault as CompactionGroupId,
471 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
472 ] {
473 init_version.levels.insert(
474 group_id,
475 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
476 );
477 }
478 init_version
479 }
480
481 pub fn version_delta_after(&self) -> HummockVersionDelta {
482 #[expect(deprecated)]
483 HummockVersionDelta {
484 id: self.next_version_id(),
485 prev_id: self.id,
486 trivial_move: false,
487 max_committed_epoch: self.max_committed_epoch,
488 group_deltas: Default::default(),
489 new_table_watermarks: HashMap::new(),
490 removed_table_ids: HashSet::new(),
491 change_log_delta: HashMap::new(),
492 state_table_info_delta: Default::default(),
493 vector_index_delta: Default::default(),
494 }
495 }
496
497 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
498 let table_change_log = {
499 let mut table_change_log = HashMap::new();
500 for (table_id, log) in &mut self.table_change_log {
501 let change_log_iter =
502 log.change_log_iter_mut()
503 .map(|item| EpochNewChangeLogCommon {
504 new_value: std::mem::take(&mut item.new_value),
505 old_value: std::mem::take(&mut item.old_value),
506 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
507 checkpoint_epoch: item.checkpoint_epoch,
508 });
509 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
510 }
511
512 table_change_log
513 };
514
515 let local_version = LocalHummockVersion::from(self);
516
517 (local_version, table_change_log)
518 }
519}
520
521impl<T, L> HummockVersionCommon<T, L> {
522 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
523 self.state_table_info
524 .info()
525 .get(&table_id)
526 .map(|info| info.committed_epoch)
527 }
528}
529
530#[derive(Debug, PartialEq, Clone)]
531pub struct HummockVersionDeltaCommon<T, L = T> {
532 pub id: HummockVersionId,
533 pub prev_id: HummockVersionId,
534 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
535 #[deprecated]
536 pub(crate) max_committed_epoch: u64,
537 pub trivial_move: bool,
538 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
539 pub removed_table_ids: HashSet<TableId>,
540 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
541 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
542 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
543}
544
545pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
546
547pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
548
549impl Default for HummockVersionDelta {
550 fn default() -> Self {
551 HummockVersionDelta::from(&PbHummockVersionDelta::default())
552 }
553}
554
555impl<T> HummockVersionDeltaCommon<T>
556where
557 T: for<'a> From<&'a PbSstableInfo>,
558 PbSstableInfo: for<'a> From<&'a T>,
559{
560 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
563 delta.into()
564 }
565
566 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
569 delta.into()
570 }
571
572 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
573 self.into()
574 }
575}
576
577pub trait SstableIdReader {
578 fn sst_id(&self) -> HummockSstableId;
579}
580
581pub trait ObjectIdReader {
582 fn object_id(&self) -> HummockSstableObjectId;
583}
584
585impl<T> HummockVersionDeltaCommon<T>
586where
587 T: SstableIdReader + ObjectIdReader,
588{
589 pub fn newly_added_object_ids(
594 &self,
595 exclude_table_change_log: bool,
596 ) -> HashSet<HummockObjectId> {
597 match HummockObjectId::Sstable(0.into()) {
601 HummockObjectId::Sstable(_) => {}
602 HummockObjectId::VectorFile(_) => {}
603 HummockObjectId::HnswGraphFile(_) => {}
604 };
605 self.newly_added_sst_infos(exclude_table_change_log)
606 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
607 .chain(
608 self.vector_index_delta
609 .values()
610 .flat_map(|vector_index_delta| {
611 vector_index_delta
612 .newly_added_objects()
613 .map(|(object_id, _)| object_id)
614 }),
615 )
616 .collect()
617 }
618
619 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
620 self.newly_added_sst_infos(exclude_table_change_log)
621 .map(|sst| sst.sst_id())
622 .collect()
623 }
624}
625
626impl<T> HummockVersionDeltaCommon<T> {
627 pub fn newly_added_sst_infos(
628 &self,
629 exclude_table_change_log: bool,
630 ) -> impl Iterator<Item = &'_ T> {
631 let may_table_change_delta = if exclude_table_change_log {
632 None
633 } else {
634 Some(self.change_log_delta.values())
635 };
636 self.group_deltas
637 .values()
638 .flat_map(|group_deltas| {
639 group_deltas.group_deltas.iter().flat_map(|group_delta| {
640 let sst_slice = match &group_delta {
641 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
642 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
643 inserted_table_infos,
644 ..
645 }) => Some(inserted_table_infos.iter()),
646 GroupDeltaCommon::GroupConstruct(_)
647 | GroupDeltaCommon::GroupDestroy(_)
648 | GroupDeltaCommon::GroupMerge(_)
649 | GroupDeltaCommon::TruncateTables(_) => None,
650 };
651 sst_slice.into_iter().flatten()
652 })
653 })
654 .chain(
655 may_table_change_delta
656 .map(|v| {
657 v.flat_map(|delta| {
658 let new_log = &delta.new_log;
659 new_log.new_value.iter().chain(new_log.old_value.iter())
660 })
661 })
662 .into_iter()
663 .flatten(),
664 )
665 }
666}
667
668impl HummockVersionDelta {
669 #[expect(deprecated)]
670 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
671 self.max_committed_epoch
672 }
673}
674
675impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
676where
677 T: for<'a> From<&'a PbSstableInfo>,
678{
679 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
680 #[expect(deprecated)]
681 Self {
682 id: HummockVersionId(pb_version_delta.id),
683 prev_id: HummockVersionId(pb_version_delta.prev_id),
684 group_deltas: pb_version_delta
685 .group_deltas
686 .iter()
687 .map(|(group_id, deltas)| {
688 (
689 *group_id as CompactionGroupId,
690 GroupDeltasCommon::from(deltas),
691 )
692 })
693 .collect(),
694 max_committed_epoch: pb_version_delta.max_committed_epoch,
695 trivial_move: pb_version_delta.trivial_move,
696 new_table_watermarks: pb_version_delta
697 .new_table_watermarks
698 .iter()
699 .map(|(table_id, watermarks)| {
700 (TableId::new(*table_id), TableWatermarks::from(watermarks))
701 })
702 .collect(),
703 removed_table_ids: pb_version_delta
704 .removed_table_ids
705 .iter()
706 .map(|table_id| TableId::new(*table_id))
707 .collect(),
708 change_log_delta: pb_version_delta
709 .change_log_delta
710 .iter()
711 .map(|(table_id, log_delta)| {
712 (
713 TableId::new(*table_id),
714 ChangeLogDeltaCommon {
715 truncate_epoch: log_delta.truncate_epoch,
716 new_log: log_delta.new_log.as_ref().unwrap().into(),
717 },
718 )
719 })
720 .collect(),
721
722 state_table_info_delta: pb_version_delta
723 .state_table_info_delta
724 .iter()
725 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
726 .collect(),
727 vector_index_delta: pb_version_delta
728 .vector_index_delta
729 .iter()
730 .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone().into()))
731 .collect(),
732 }
733 }
734}
735
736impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
737where
738 PbSstableInfo: for<'a> From<&'a T>,
739{
740 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
741 #[expect(deprecated)]
742 Self {
743 id: version_delta.id.0,
744 prev_id: version_delta.prev_id.0,
745 group_deltas: version_delta
746 .group_deltas
747 .iter()
748 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
749 .collect(),
750 max_committed_epoch: version_delta.max_committed_epoch,
751 trivial_move: version_delta.trivial_move,
752 new_table_watermarks: version_delta
753 .new_table_watermarks
754 .iter()
755 .map(|(table_id, watermarks)| (table_id.as_raw_id(), watermarks.into()))
756 .collect(),
757 removed_table_ids: version_delta
758 .removed_table_ids
759 .iter()
760 .map(|table_id| table_id.as_raw_id())
761 .collect(),
762 change_log_delta: version_delta
763 .change_log_delta
764 .iter()
765 .map(|(table_id, log_delta)| (table_id.as_raw_id(), log_delta.into()))
766 .collect(),
767 state_table_info_delta: version_delta
768 .state_table_info_delta
769 .iter()
770 .map(|(table_id, delta)| (table_id.as_raw_id(), *delta))
771 .collect(),
772 vector_index_delta: version_delta
773 .vector_index_delta
774 .iter()
775 .map(|(table_id, delta)| (table_id.as_raw_id(), delta.clone().into()))
776 .collect(),
777 }
778 }
779}
780
781impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
782where
783 PbSstableInfo: From<T>,
784{
785 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
786 #[expect(deprecated)]
787 Self {
788 id: version_delta.id.0,
789 prev_id: version_delta.prev_id.0,
790 group_deltas: version_delta
791 .group_deltas
792 .into_iter()
793 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
794 .collect(),
795 max_committed_epoch: version_delta.max_committed_epoch,
796 trivial_move: version_delta.trivial_move,
797 new_table_watermarks: version_delta
798 .new_table_watermarks
799 .into_iter()
800 .map(|(table_id, watermarks)| (table_id.as_raw_id(), watermarks.into()))
801 .collect(),
802 removed_table_ids: version_delta
803 .removed_table_ids
804 .into_iter()
805 .map(|table_id| table_id.as_raw_id())
806 .collect(),
807 change_log_delta: version_delta
808 .change_log_delta
809 .into_iter()
810 .map(|(table_id, log_delta)| (table_id.as_raw_id(), log_delta.into()))
811 .collect(),
812 state_table_info_delta: version_delta
813 .state_table_info_delta
814 .into_iter()
815 .map(|(table_id, delta)| (table_id.as_raw_id(), delta))
816 .collect(),
817 vector_index_delta: version_delta
818 .vector_index_delta
819 .into_iter()
820 .map(|(table_id, delta)| (table_id.as_raw_id(), delta.into()))
821 .collect(),
822 }
823 }
824}
825
826impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
827where
828 T: From<PbSstableInfo>,
829{
830 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
831 #[expect(deprecated)]
832 Self {
833 id: HummockVersionId(pb_version_delta.id),
834 prev_id: HummockVersionId(pb_version_delta.prev_id),
835 group_deltas: pb_version_delta
836 .group_deltas
837 .into_iter()
838 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
839 .collect(),
840 max_committed_epoch: pb_version_delta.max_committed_epoch,
841 trivial_move: pb_version_delta.trivial_move,
842 new_table_watermarks: pb_version_delta
843 .new_table_watermarks
844 .into_iter()
845 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
846 .collect(),
847 removed_table_ids: pb_version_delta
848 .removed_table_ids
849 .into_iter()
850 .map(TableId::new)
851 .collect(),
852 change_log_delta: pb_version_delta
853 .change_log_delta
854 .iter()
855 .map(|(table_id, log_delta)| {
856 (
857 TableId::new(*table_id),
858 ChangeLogDeltaCommon {
859 new_log: log_delta.new_log.clone().unwrap().into(),
860 truncate_epoch: log_delta.truncate_epoch,
861 },
862 )
863 })
864 .collect(),
865 state_table_info_delta: pb_version_delta
866 .state_table_info_delta
867 .iter()
868 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
869 .collect(),
870 vector_index_delta: pb_version_delta
871 .vector_index_delta
872 .into_iter()
873 .map(|(table_id, delta)| (TableId::new(table_id), delta.into()))
874 .collect(),
875 }
876 }
877}
878
879#[derive(Debug, PartialEq, Clone)]
880pub struct IntraLevelDeltaCommon<T> {
881 pub level_idx: u32,
882 pub l0_sub_level_id: u64,
883 pub removed_table_ids: HashSet<HummockSstableId>,
884 pub inserted_table_infos: Vec<T>,
885 pub vnode_partition_count: u32,
886 pub compaction_group_version_id: u64,
887}
888
889pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
890
891impl IntraLevelDelta {
892 pub fn estimated_encode_len(&self) -> usize {
893 size_of::<u32>()
894 + size_of::<u64>()
895 + self.removed_table_ids.len() * size_of::<u32>()
896 + self
897 .inserted_table_infos
898 .iter()
899 .map(|sst| sst.estimated_encode_len())
900 .sum::<usize>()
901 + size_of::<u32>()
902 }
903}
904
905impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
906where
907 T: From<PbSstableInfo>,
908{
909 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
910 Self {
911 level_idx: pb_intra_level_delta.level_idx,
912 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
913 removed_table_ids: HashSet::from_iter(
914 pb_intra_level_delta
915 .removed_table_ids
916 .iter()
917 .map(|sst_id| (*sst_id).into()),
918 ),
919 inserted_table_infos: pb_intra_level_delta
920 .inserted_table_infos
921 .into_iter()
922 .map(Into::into)
923 .collect_vec(),
924 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
925 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
926 }
927 }
928}
929
930impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
931where
932 PbSstableInfo: From<T>,
933{
934 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
935 Self {
936 level_idx: intra_level_delta.level_idx,
937 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
938 removed_table_ids: intra_level_delta
939 .removed_table_ids
940 .into_iter()
941 .map(|sst_id| sst_id.inner())
942 .collect(),
943 inserted_table_infos: intra_level_delta
944 .inserted_table_infos
945 .into_iter()
946 .map(Into::into)
947 .collect_vec(),
948 vnode_partition_count: intra_level_delta.vnode_partition_count,
949 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
950 }
951 }
952}
953
954impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
955where
956 PbSstableInfo: for<'a> From<&'a T>,
957{
958 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
959 Self {
960 level_idx: intra_level_delta.level_idx,
961 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
962 removed_table_ids: intra_level_delta
963 .removed_table_ids
964 .iter()
965 .map(|sst_id| sst_id.inner())
966 .collect(),
967 inserted_table_infos: intra_level_delta
968 .inserted_table_infos
969 .iter()
970 .map(Into::into)
971 .collect_vec(),
972 vnode_partition_count: intra_level_delta.vnode_partition_count,
973 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
974 }
975 }
976}
977
978impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
979where
980 T: for<'a> From<&'a PbSstableInfo>,
981{
982 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
983 Self {
984 level_idx: pb_intra_level_delta.level_idx,
985 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
986 removed_table_ids: HashSet::from_iter(
987 pb_intra_level_delta
988 .removed_table_ids
989 .iter()
990 .map(|sst_id| (*sst_id).into()),
991 ),
992 inserted_table_infos: pb_intra_level_delta
993 .inserted_table_infos
994 .iter()
995 .map(Into::into)
996 .collect_vec(),
997 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
998 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
999 }
1000 }
1001}
1002
1003impl IntraLevelDelta {
1004 pub fn new(
1005 level_idx: u32,
1006 l0_sub_level_id: u64,
1007 removed_table_ids: HashSet<HummockSstableId>,
1008 inserted_table_infos: Vec<SstableInfo>,
1009 vnode_partition_count: u32,
1010 compaction_group_version_id: u64,
1011 ) -> Self {
1012 Self {
1013 level_idx,
1014 l0_sub_level_id,
1015 removed_table_ids,
1016 inserted_table_infos,
1017 vnode_partition_count,
1018 compaction_group_version_id,
1019 }
1020 }
1021}
1022
1023#[derive(Debug, PartialEq, Clone)]
1024pub enum GroupDeltaCommon<T> {
1025 NewL0SubLevel(Vec<T>),
1026 IntraLevel(IntraLevelDeltaCommon<T>),
1027 GroupConstruct(Box<PbGroupConstruct>),
1028 GroupDestroy(PbGroupDestroy),
1029 GroupMerge(PbGroupMerge),
1030 TruncateTables(HashSet<TableId>),
1031}
1032
1033pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1034
1035impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1036where
1037 T: From<PbSstableInfo>,
1038{
1039 fn from(pb_group_delta: PbGroupDelta) -> Self {
1040 match pb_group_delta.delta_type {
1041 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1042 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1043 }
1044 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1045 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1046 }
1047 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1048 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1049 }
1050 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1051 GroupDeltaCommon::GroupMerge(pb_group_merge)
1052 }
1053 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1054 pb_new_sub_level
1055 .inserted_table_infos
1056 .into_iter()
1057 .map(T::from)
1058 .collect(),
1059 ),
1060 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1061 GroupDeltaCommon::TruncateTables(
1062 pb_truncate_tables
1063 .table_ids
1064 .iter()
1065 .map(Into::into)
1066 .collect(),
1067 )
1068 }
1069
1070 None => panic!("delta_type is not set"),
1071 }
1072 }
1073}
1074
1075impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1076where
1077 PbSstableInfo: From<T>,
1078{
1079 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1080 match group_delta {
1081 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1082 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1083 },
1084 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1085 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1086 },
1087 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1088 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1089 },
1090 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1091 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1092 },
1093 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1094 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1095 inserted_table_infos: new_sub_level
1096 .into_iter()
1097 .map(PbSstableInfo::from)
1098 .collect(),
1099 })),
1100 },
1101 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1102 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1103 table_ids: table_ids.iter().map(Into::into).collect(),
1104 })),
1105 },
1106 }
1107 }
1108}
1109
1110impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1111where
1112 PbSstableInfo: for<'a> From<&'a T>,
1113{
1114 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1115 match group_delta {
1116 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1117 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1118 },
1119 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1120 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1121 },
1122 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1123 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1124 },
1125 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1126 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1127 },
1128 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1129 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1130 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1131 })),
1132 },
1133 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1134 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1135 table_ids: table_ids.iter().map(Into::into).collect(),
1136 })),
1137 },
1138 }
1139 }
1140}
1141
1142impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1143where
1144 T: for<'a> From<&'a PbSstableInfo>,
1145{
1146 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1147 match &pb_group_delta.delta_type {
1148 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1149 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1150 }
1151 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1152 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1153 }
1154 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1155 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1156 }
1157 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1158 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1159 }
1160 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1161 pb_new_sub_level
1162 .inserted_table_infos
1163 .iter()
1164 .map(T::from)
1165 .collect(),
1166 ),
1167 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1168 GroupDeltaCommon::TruncateTables(
1169 pb_truncate_tables
1170 .table_ids
1171 .iter()
1172 .map(Into::into)
1173 .collect(),
1174 )
1175 }
1176 None => panic!("delta_type is not set"),
1177 }
1178 }
1179}
1180
1181#[derive(Debug, PartialEq, Clone)]
1182pub struct GroupDeltasCommon<T> {
1183 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1184}
1185
1186impl<T> Default for GroupDeltasCommon<T> {
1187 fn default() -> Self {
1188 Self {
1189 group_deltas: vec![],
1190 }
1191 }
1192}
1193
1194pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1195
1196impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1197where
1198 T: From<PbSstableInfo>,
1199{
1200 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1201 Self {
1202 group_deltas: pb_group_deltas
1203 .group_deltas
1204 .into_iter()
1205 .map(GroupDeltaCommon::from)
1206 .collect_vec(),
1207 }
1208 }
1209}
1210
1211impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1212where
1213 PbSstableInfo: From<T>,
1214{
1215 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1216 Self {
1217 group_deltas: group_deltas
1218 .group_deltas
1219 .into_iter()
1220 .map(|group_delta| group_delta.into())
1221 .collect_vec(),
1222 }
1223 }
1224}
1225
1226impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1227where
1228 PbSstableInfo: for<'a> From<&'a T>,
1229{
1230 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1231 Self {
1232 group_deltas: group_deltas
1233 .group_deltas
1234 .iter()
1235 .map(|group_delta| group_delta.into())
1236 .collect_vec(),
1237 }
1238 }
1239}
1240
1241impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1242where
1243 T: for<'a> From<&'a PbSstableInfo>,
1244{
1245 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1246 Self {
1247 group_deltas: pb_group_deltas
1248 .group_deltas
1249 .iter()
1250 .map(GroupDeltaCommon::from)
1251 .collect_vec(),
1252 }
1253 }
1254}
1255
1256impl<T> GroupDeltasCommon<T>
1257where
1258 PbSstableInfo: for<'a> From<&'a T>,
1259{
1260 pub fn to_protobuf(&self) -> PbGroupDeltas {
1261 self.into()
1262 }
1263}
1264
1265impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1266 #[expect(deprecated)]
1267 fn from(delta: HummockVersionDelta) -> Self {
1268 Self {
1269 id: delta.id,
1270 prev_id: delta.prev_id,
1271 group_deltas: delta.group_deltas,
1272 max_committed_epoch: delta.max_committed_epoch,
1273 trivial_move: delta.trivial_move,
1274 new_table_watermarks: delta.new_table_watermarks,
1275 removed_table_ids: delta.removed_table_ids,
1276 change_log_delta: delta
1277 .change_log_delta
1278 .into_iter()
1279 .map(|(k, v)| {
1280 (
1281 k,
1282 ChangeLogDeltaCommon {
1283 truncate_epoch: v.truncate_epoch,
1284 new_log: EpochNewChangeLogCommon {
1285 new_value: Vec::new(),
1286 old_value: Vec::new(),
1287 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1288 checkpoint_epoch: v.new_log.checkpoint_epoch,
1289 },
1290 },
1291 )
1292 })
1293 .collect(),
1294 state_table_info_delta: delta.state_table_info_delta,
1295 vector_index_delta: delta.vector_index_delta,
1296 }
1297 }
1298}
1299
1300impl From<HummockVersion> for LocalHummockVersion {
1301 #[expect(deprecated)]
1302 fn from(version: HummockVersion) -> Self {
1303 Self {
1304 id: version.id,
1305 levels: version.levels,
1306 max_committed_epoch: version.max_committed_epoch,
1307 table_watermarks: version.table_watermarks,
1308 table_change_log: version
1309 .table_change_log
1310 .into_iter()
1311 .map(|(k, v)| {
1312 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1313 .change_log_into_iter()
1314 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1315 new_value: Vec::new(),
1316 old_value: Vec::new(),
1317 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1318 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1319 })
1320 .collect();
1321 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1322 })
1323 .collect(),
1324 state_table_info: version.state_table_info,
1325 vector_indexes: version.vector_indexes,
1326 }
1327 }
1328}