1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::mem::{replace, size_of};
18use std::ops::Deref;
19use std::sync::{Arc, LazyLock};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::epoch::INVALID_EPOCH;
24use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType};
25use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
26use risingwave_pb::hummock::*;
27use tracing::warn;
28
29use crate::change_log::{
30 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
31};
32use crate::compaction_group::StaticCompactionGroupId;
33use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
34use crate::level::LevelsCommon;
35use crate::sstable_info::SstableInfo;
36use crate::table_watermark::TableWatermarks;
37use crate::vector_index::{VectorIndex, VectorIndexDelta};
38use crate::{
39 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockObjectId, HummockSstableId,
40 HummockSstableObjectId, HummockVersionId,
41};
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct HummockVersionStateTableInfo {
45 state_table_info: HashMap<TableId, PbStateTableInfo>,
46
47 compaction_group_member_tables: HashMap<CompactionGroupId, BTreeSet<TableId>>,
49}
50
51impl HummockVersionStateTableInfo {
52 pub fn empty() -> Self {
53 Self {
54 state_table_info: HashMap::new(),
55 compaction_group_member_tables: HashMap::new(),
56 }
57 }
58
59 fn build_compaction_group_member_tables(
60 state_table_info: &HashMap<TableId, PbStateTableInfo>,
61 ) -> HashMap<CompactionGroupId, BTreeSet<TableId>> {
62 let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new();
63 for (table_id, info) in state_table_info {
64 assert!(
65 ret.entry(info.compaction_group_id)
66 .or_default()
67 .insert(*table_id)
68 );
69 }
70 ret
71 }
72
73 pub fn build_table_compaction_group_id(&self) -> HashMap<TableId, CompactionGroupId> {
74 self.state_table_info
75 .iter()
76 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
77 .collect()
78 }
79
80 pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
81 let state_table_info = state_table_info
82 .iter()
83 .map(|(table_id, info)| (TableId::new(*table_id), *info))
84 .collect();
85 let compaction_group_member_tables =
86 Self::build_compaction_group_member_tables(&state_table_info);
87 Self {
88 state_table_info,
89 compaction_group_member_tables,
90 }
91 }
92
93 pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
94 self.state_table_info
95 .iter()
96 .map(|(table_id, info)| (table_id.table_id, *info))
97 .collect()
98 }
99
100 pub fn apply_delta(
101 &mut self,
102 delta: &HashMap<TableId, StateTableInfoDelta>,
103 removed_table_id: &HashSet<TableId>,
104 ) -> (HashMap<TableId, Option<StateTableInfo>>, bool) {
105 let mut changed_table = HashMap::new();
106 let mut has_bumped_committed_epoch = false;
107 fn remove_table_from_compaction_group(
108 compaction_group_member_tables: &mut HashMap<CompactionGroupId, BTreeSet<TableId>>,
109 compaction_group_id: CompactionGroupId,
110 table_id: TableId,
111 ) {
112 let member_tables = compaction_group_member_tables
113 .get_mut(&compaction_group_id)
114 .expect("should exist");
115 assert!(member_tables.remove(&table_id));
116 if member_tables.is_empty() {
117 assert!(
118 compaction_group_member_tables
119 .remove(&compaction_group_id)
120 .is_some()
121 );
122 }
123 }
124 for table_id in removed_table_id {
125 if let Some(prev_info) = self.state_table_info.remove(table_id) {
126 remove_table_from_compaction_group(
127 &mut self.compaction_group_member_tables,
128 prev_info.compaction_group_id,
129 *table_id,
130 );
131 assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
132 } else {
133 warn!(
134 table_id = table_id.table_id,
135 "table to remove does not exist"
136 );
137 }
138 }
139 for (table_id, delta) in delta {
140 if removed_table_id.contains(table_id) {
141 continue;
142 }
143 let new_info = StateTableInfo {
144 committed_epoch: delta.committed_epoch,
145 compaction_group_id: delta.compaction_group_id,
146 };
147 match self.state_table_info.entry(*table_id) {
148 Entry::Occupied(mut entry) => {
149 let prev_info = entry.get_mut();
150 assert!(
151 new_info.committed_epoch >= prev_info.committed_epoch,
152 "state table info regress. table id: {}, prev_info: {:?}, new_info: {:?}",
153 table_id.table_id,
154 prev_info,
155 new_info
156 );
157 if new_info.committed_epoch > prev_info.committed_epoch {
158 has_bumped_committed_epoch = true;
159 }
160 if prev_info.compaction_group_id != new_info.compaction_group_id {
161 remove_table_from_compaction_group(
163 &mut self.compaction_group_member_tables,
164 prev_info.compaction_group_id,
165 *table_id,
166 );
167 assert!(
168 self.compaction_group_member_tables
169 .entry(new_info.compaction_group_id)
170 .or_default()
171 .insert(*table_id)
172 );
173 }
174 let prev_info = replace(prev_info, new_info);
175 changed_table.insert(*table_id, Some(prev_info));
176 }
177 Entry::Vacant(entry) => {
178 assert!(
179 self.compaction_group_member_tables
180 .entry(new_info.compaction_group_id)
181 .or_default()
182 .insert(*table_id)
183 );
184 has_bumped_committed_epoch = true;
185 entry.insert(new_info);
186 changed_table.insert(*table_id, None);
187 }
188 }
189 }
190 debug_assert_eq!(
191 self.compaction_group_member_tables,
192 Self::build_compaction_group_member_tables(&self.state_table_info)
193 );
194 (changed_table, has_bumped_committed_epoch)
195 }
196
197 pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
198 &self.state_table_info
199 }
200
201 pub fn compaction_group_member_table_ids(
202 &self,
203 compaction_group_id: CompactionGroupId,
204 ) -> &BTreeSet<TableId> {
205 static EMPTY_SET: LazyLock<BTreeSet<TableId>> = LazyLock::new(BTreeSet::new);
206 self.compaction_group_member_tables
207 .get(&compaction_group_id)
208 .unwrap_or_else(|| EMPTY_SET.deref())
209 }
210
211 pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
212 &self.compaction_group_member_tables
213 }
214
215 pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
216 self.state_table_info
217 .values()
218 .map(|info| info.committed_epoch)
219 .max()
220 }
221}
222
223#[derive(Debug, Clone, PartialEq)]
224pub struct HummockVersionCommon<T, L = T> {
225 pub id: HummockVersionId,
226 pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
227 #[deprecated]
228 pub(crate) max_committed_epoch: u64,
229 pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
230 pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
231 pub state_table_info: HummockVersionStateTableInfo,
232 pub vector_indexes: HashMap<TableId, VectorIndex>,
233}
234
235pub type HummockVersion = HummockVersionCommon<SstableInfo>;
236
237pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;
238
239impl Default for HummockVersion {
240 fn default() -> Self {
241 HummockVersion::from(&PbHummockVersion::default())
242 }
243}
244
245impl<T> HummockVersionCommon<T>
246where
247 T: for<'a> From<&'a PbSstableInfo>,
248 PbSstableInfo: for<'a> From<&'a T>,
249{
250 pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self {
253 pb_version.into()
254 }
255
256 pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self {
259 pb_version.into()
260 }
261
262 pub fn to_protobuf(&self) -> PbHummockVersion {
263 self.into()
264 }
265}
266
267impl HummockVersion {
268 pub fn estimated_encode_len(&self) -> usize {
269 self.levels.len() * size_of::<CompactionGroupId>()
270 + self
271 .levels
272 .values()
273 .map(|level| level.estimated_encode_len())
274 .sum::<usize>()
275 + self.table_watermarks.len() * size_of::<u32>()
276 + self
277 .table_watermarks
278 .values()
279 .map(|table_watermark| table_watermark.estimated_encode_len())
280 .sum::<usize>()
281 + self
282 .table_change_log
283 .values()
284 .map(|c| {
285 c.iter()
286 .map(|l| {
287 l.old_value
288 .iter()
289 .chain(l.new_value.iter())
290 .map(|s| s.estimated_encode_len())
291 .sum::<usize>()
292 })
293 .sum::<usize>()
294 })
295 .sum::<usize>()
296 }
297}
298
299impl<T> From<&PbHummockVersion> for HummockVersionCommon<T>
300where
301 T: for<'a> From<&'a PbSstableInfo>,
302{
303 fn from(pb_version: &PbHummockVersion) -> Self {
304 #[expect(deprecated)]
305 Self {
306 id: HummockVersionId(pb_version.id),
307 levels: pb_version
308 .levels
309 .iter()
310 .map(|(group_id, levels)| {
311 (*group_id as CompactionGroupId, LevelsCommon::from(levels))
312 })
313 .collect(),
314 max_committed_epoch: pb_version.max_committed_epoch,
315 table_watermarks: pb_version
316 .table_watermarks
317 .iter()
318 .map(|(table_id, table_watermark)| {
319 (
320 TableId::new(*table_id),
321 Arc::new(TableWatermarks::from(table_watermark)),
322 )
323 })
324 .collect(),
325 table_change_log: pb_version
326 .table_change_logs
327 .iter()
328 .map(|(table_id, change_log)| {
329 (
330 TableId::new(*table_id),
331 TableChangeLogCommon::from_protobuf(change_log),
332 )
333 })
334 .collect(),
335 state_table_info: HummockVersionStateTableInfo::from_protobuf(
336 &pb_version.state_table_info,
337 ),
338 vector_indexes: pb_version
339 .vector_indexes
340 .iter()
341 .map(|(table_id, index)| (table_id.into(), index.clone().into()))
342 .collect(),
343 }
344 }
345}
346
347impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
348where
349 PbSstableInfo: for<'a> From<&'a T>,
350{
351 fn from(version: &HummockVersionCommon<T>) -> Self {
352 #[expect(deprecated)]
353 Self {
354 id: version.id.0,
355 levels: version
356 .levels
357 .iter()
358 .map(|(group_id, levels)| (*group_id as _, levels.into()))
359 .collect(),
360 max_committed_epoch: version.max_committed_epoch,
361 table_watermarks: version
362 .table_watermarks
363 .iter()
364 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
365 .collect(),
366 table_change_logs: version
367 .table_change_log
368 .iter()
369 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
370 .collect(),
371 state_table_info: version.state_table_info.to_protobuf(),
372 vector_indexes: version
373 .vector_indexes
374 .iter()
375 .map(|(table_id, index)| (table_id.table_id, index.clone().into()))
376 .collect(),
377 }
378 }
379}
380
381impl<T> From<HummockVersionCommon<T>> for PbHummockVersion
382where
383 PbSstableInfo: From<T>,
384 PbSstableInfo: for<'a> From<&'a T>,
385{
386 fn from(version: HummockVersionCommon<T>) -> Self {
387 #[expect(deprecated)]
388 Self {
389 id: version.id.0,
390 levels: version
391 .levels
392 .into_iter()
393 .map(|(group_id, levels)| (group_id as _, levels.into()))
394 .collect(),
395 max_committed_epoch: version.max_committed_epoch,
396 table_watermarks: version
397 .table_watermarks
398 .into_iter()
399 .map(|(table_id, watermark)| (table_id.table_id, watermark.as_ref().into()))
400 .collect(),
401 table_change_logs: version
402 .table_change_log
403 .into_iter()
404 .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
405 .collect(),
406 state_table_info: version.state_table_info.to_protobuf(),
407 vector_indexes: version
408 .vector_indexes
409 .into_iter()
410 .map(|(table_id, index)| (table_id.table_id, index.into()))
411 .collect(),
412 }
413 }
414}
415
416impl HummockVersion {
417 pub fn next_version_id(&self) -> HummockVersionId {
418 self.id.next()
419 }
420
421 pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
422 self.state_table_info.state_table_info.is_empty()
424 && self.levels.values().any(|group| {
425 #[expect(deprecated)]
427 !group.member_table_ids.is_empty()
428 })
429 }
430
431 pub fn may_fill_backward_compatible_state_table_info_delta(
432 &self,
433 delta: &mut HummockVersionDelta,
434 ) {
435 #[expect(deprecated)]
436 for (cg_id, group) in &self.levels {
438 for table_id in &group.member_table_ids {
439 assert!(
440 delta
441 .state_table_info_delta
442 .insert(
443 TableId::new(*table_id),
444 StateTableInfoDelta {
445 committed_epoch: self.max_committed_epoch,
446 compaction_group_id: *cg_id,
447 }
448 )
449 .is_none(),
450 "duplicate table id {} in cg {}",
451 table_id,
452 cg_id
453 );
454 }
455 }
456 }
457
458 pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
459 #[expect(deprecated)]
460 let mut init_version = HummockVersion {
461 id: FIRST_VERSION_ID,
462 levels: Default::default(),
463 max_committed_epoch: INVALID_EPOCH,
464 table_watermarks: HashMap::new(),
465 table_change_log: HashMap::new(),
466 state_table_info: HummockVersionStateTableInfo::empty(),
467 vector_indexes: Default::default(),
468 };
469 for group_id in [
470 StaticCompactionGroupId::StateDefault as CompactionGroupId,
471 StaticCompactionGroupId::MaterializedView as CompactionGroupId,
472 ] {
473 init_version.levels.insert(
474 group_id,
475 build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()),
476 );
477 }
478 init_version
479 }
480
481 pub fn version_delta_after(&self) -> HummockVersionDelta {
482 #[expect(deprecated)]
483 HummockVersionDelta {
484 id: self.next_version_id(),
485 prev_id: self.id,
486 trivial_move: false,
487 max_committed_epoch: self.max_committed_epoch,
488 group_deltas: Default::default(),
489 new_table_watermarks: HashMap::new(),
490 removed_table_ids: HashSet::new(),
491 change_log_delta: HashMap::new(),
492 state_table_info_delta: Default::default(),
493 vector_index_delta: Default::default(),
494 }
495 }
496
497 pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
498 let table_change_log = {
499 let mut table_change_log = HashMap::new();
500 for (table_id, log) in &mut self.table_change_log {
501 let change_log_iter =
502 log.change_log_iter_mut()
503 .map(|item| EpochNewChangeLogCommon {
504 new_value: std::mem::take(&mut item.new_value),
505 old_value: std::mem::take(&mut item.old_value),
506 non_checkpoint_epochs: item.non_checkpoint_epochs.clone(),
507 checkpoint_epoch: item.checkpoint_epoch,
508 });
509 table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
510 }
511
512 table_change_log
513 };
514
515 let local_version = LocalHummockVersion::from(self);
516
517 (local_version, table_change_log)
518 }
519}
520
521impl<T, L> HummockVersionCommon<T, L> {
522 pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
523 self.state_table_info
524 .info()
525 .get(&table_id)
526 .map(|info| info.committed_epoch)
527 }
528}
529
530#[derive(Debug, PartialEq, Clone)]
531pub struct HummockVersionDeltaCommon<T, L = T> {
532 pub id: HummockVersionId,
533 pub prev_id: HummockVersionId,
534 pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
535 #[deprecated]
536 pub(crate) max_committed_epoch: u64,
537 pub trivial_move: bool,
538 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
539 pub removed_table_ids: HashSet<TableId>,
540 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
541 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
542 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
543}
544
545pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;
546
547pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;
548
549impl Default for HummockVersionDelta {
550 fn default() -> Self {
551 HummockVersionDelta::from(&PbHummockVersionDelta::default())
552 }
553}
554
555impl<T> HummockVersionDeltaCommon<T>
556where
557 T: for<'a> From<&'a PbSstableInfo>,
558 PbSstableInfo: for<'a> From<&'a T>,
559{
560 pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self {
563 delta.into()
564 }
565
566 pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self {
569 delta.into()
570 }
571
572 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
573 self.into()
574 }
575}
576
577pub trait SstableIdReader {
578 fn sst_id(&self) -> HummockSstableId;
579}
580
581pub trait ObjectIdReader {
582 fn object_id(&self) -> HummockSstableObjectId;
583}
584
585impl<T> HummockVersionDeltaCommon<T>
586where
587 T: SstableIdReader + ObjectIdReader,
588{
589 pub fn newly_added_object_ids(
594 &self,
595 exclude_table_change_log: bool,
596 ) -> HashSet<HummockObjectId> {
597 match HummockObjectId::Sstable(0.into()) {
601 HummockObjectId::Sstable(_) => {}
602 HummockObjectId::VectorFile(_) => {}
603 };
604 self.newly_added_sst_infos(exclude_table_change_log)
605 .map(|sst| HummockObjectId::Sstable(sst.object_id()))
606 .chain(
607 self.vector_index_delta
608 .values()
609 .flat_map(|vector_index_delta| {
610 vector_index_delta
611 .newly_added_objects()
612 .map(|(object_id, _)| object_id)
613 }),
614 )
615 .collect()
616 }
617
618 pub fn newly_added_sst_ids(&self, exclude_table_change_log: bool) -> HashSet<HummockSstableId> {
619 self.newly_added_sst_infos(exclude_table_change_log)
620 .map(|sst| sst.sst_id())
621 .collect()
622 }
623}
624
625impl<T> HummockVersionDeltaCommon<T> {
626 pub fn newly_added_sst_infos(
627 &self,
628 exclude_table_change_log: bool,
629 ) -> impl Iterator<Item = &'_ T> {
630 let may_table_change_delta = if exclude_table_change_log {
631 None
632 } else {
633 Some(self.change_log_delta.values())
634 };
635 self.group_deltas
636 .values()
637 .flat_map(|group_deltas| {
638 group_deltas.group_deltas.iter().flat_map(|group_delta| {
639 let sst_slice = match &group_delta {
640 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos)
641 | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon {
642 inserted_table_infos,
643 ..
644 }) => Some(inserted_table_infos.iter()),
645 GroupDeltaCommon::GroupConstruct(_)
646 | GroupDeltaCommon::GroupDestroy(_)
647 | GroupDeltaCommon::GroupMerge(_)
648 | GroupDeltaCommon::TruncateTables(_) => None,
649 };
650 sst_slice.into_iter().flatten()
651 })
652 })
653 .chain(
654 may_table_change_delta
655 .map(|v| {
656 v.flat_map(|delta| {
657 let new_log = &delta.new_log;
658 new_log.new_value.iter().chain(new_log.old_value.iter())
659 })
660 })
661 .into_iter()
662 .flatten(),
663 )
664 }
665}
666
667impl HummockVersionDelta {
668 #[expect(deprecated)]
669 pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
670 self.max_committed_epoch
671 }
672}
673
674impl<T> From<&PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
675where
676 T: for<'a> From<&'a PbSstableInfo>,
677{
678 fn from(pb_version_delta: &PbHummockVersionDelta) -> Self {
679 #[expect(deprecated)]
680 Self {
681 id: HummockVersionId(pb_version_delta.id),
682 prev_id: HummockVersionId(pb_version_delta.prev_id),
683 group_deltas: pb_version_delta
684 .group_deltas
685 .iter()
686 .map(|(group_id, deltas)| {
687 (
688 *group_id as CompactionGroupId,
689 GroupDeltasCommon::from(deltas),
690 )
691 })
692 .collect(),
693 max_committed_epoch: pb_version_delta.max_committed_epoch,
694 trivial_move: pb_version_delta.trivial_move,
695 new_table_watermarks: pb_version_delta
696 .new_table_watermarks
697 .iter()
698 .map(|(table_id, watermarks)| {
699 (TableId::new(*table_id), TableWatermarks::from(watermarks))
700 })
701 .collect(),
702 removed_table_ids: pb_version_delta
703 .removed_table_ids
704 .iter()
705 .map(|table_id| TableId::new(*table_id))
706 .collect(),
707 change_log_delta: pb_version_delta
708 .change_log_delta
709 .iter()
710 .map(|(table_id, log_delta)| {
711 (
712 TableId::new(*table_id),
713 ChangeLogDeltaCommon {
714 truncate_epoch: log_delta.truncate_epoch,
715 new_log: log_delta.new_log.as_ref().unwrap().into(),
716 },
717 )
718 })
719 .collect(),
720
721 state_table_info_delta: pb_version_delta
722 .state_table_info_delta
723 .iter()
724 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
725 .collect(),
726 vector_index_delta: pb_version_delta
727 .vector_index_delta
728 .iter()
729 .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone().into()))
730 .collect(),
731 }
732 }
733}
734
735impl<T> From<&HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
736where
737 PbSstableInfo: for<'a> From<&'a T>,
738{
739 fn from(version_delta: &HummockVersionDeltaCommon<T>) -> Self {
740 #[expect(deprecated)]
741 Self {
742 id: version_delta.id.0,
743 prev_id: version_delta.prev_id.0,
744 group_deltas: version_delta
745 .group_deltas
746 .iter()
747 .map(|(group_id, deltas)| (*group_id as _, deltas.into()))
748 .collect(),
749 max_committed_epoch: version_delta.max_committed_epoch,
750 trivial_move: version_delta.trivial_move,
751 new_table_watermarks: version_delta
752 .new_table_watermarks
753 .iter()
754 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
755 .collect(),
756 removed_table_ids: version_delta
757 .removed_table_ids
758 .iter()
759 .map(|table_id| table_id.table_id)
760 .collect(),
761 change_log_delta: version_delta
762 .change_log_delta
763 .iter()
764 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
765 .collect(),
766 state_table_info_delta: version_delta
767 .state_table_info_delta
768 .iter()
769 .map(|(table_id, delta)| (table_id.table_id, *delta))
770 .collect(),
771 vector_index_delta: version_delta
772 .vector_index_delta
773 .iter()
774 .map(|(table_id, delta)| (table_id.table_id, delta.clone().into()))
775 .collect(),
776 }
777 }
778}
779
780impl<T> From<HummockVersionDeltaCommon<T>> for PbHummockVersionDelta
781where
782 PbSstableInfo: From<T>,
783{
784 fn from(version_delta: HummockVersionDeltaCommon<T>) -> Self {
785 #[expect(deprecated)]
786 Self {
787 id: version_delta.id.0,
788 prev_id: version_delta.prev_id.0,
789 group_deltas: version_delta
790 .group_deltas
791 .into_iter()
792 .map(|(group_id, deltas)| (group_id as _, deltas.into()))
793 .collect(),
794 max_committed_epoch: version_delta.max_committed_epoch,
795 trivial_move: version_delta.trivial_move,
796 new_table_watermarks: version_delta
797 .new_table_watermarks
798 .into_iter()
799 .map(|(table_id, watermarks)| (table_id.table_id, watermarks.into()))
800 .collect(),
801 removed_table_ids: version_delta
802 .removed_table_ids
803 .into_iter()
804 .map(|table_id| table_id.table_id)
805 .collect(),
806 change_log_delta: version_delta
807 .change_log_delta
808 .into_iter()
809 .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into()))
810 .collect(),
811 state_table_info_delta: version_delta
812 .state_table_info_delta
813 .into_iter()
814 .map(|(table_id, delta)| (table_id.table_id, delta))
815 .collect(),
816 vector_index_delta: version_delta
817 .vector_index_delta
818 .into_iter()
819 .map(|(table_id, delta)| (table_id.table_id, delta.into()))
820 .collect(),
821 }
822 }
823}
824
825impl<T> From<PbHummockVersionDelta> for HummockVersionDeltaCommon<T>
826where
827 T: From<PbSstableInfo>,
828{
829 fn from(pb_version_delta: PbHummockVersionDelta) -> Self {
830 #[expect(deprecated)]
831 Self {
832 id: HummockVersionId(pb_version_delta.id),
833 prev_id: HummockVersionId(pb_version_delta.prev_id),
834 group_deltas: pb_version_delta
835 .group_deltas
836 .into_iter()
837 .map(|(group_id, deltas)| (group_id as CompactionGroupId, deltas.into()))
838 .collect(),
839 max_committed_epoch: pb_version_delta.max_committed_epoch,
840 trivial_move: pb_version_delta.trivial_move,
841 new_table_watermarks: pb_version_delta
842 .new_table_watermarks
843 .into_iter()
844 .map(|(table_id, watermarks)| (TableId::new(table_id), watermarks.into()))
845 .collect(),
846 removed_table_ids: pb_version_delta
847 .removed_table_ids
848 .into_iter()
849 .map(TableId::new)
850 .collect(),
851 change_log_delta: pb_version_delta
852 .change_log_delta
853 .iter()
854 .map(|(table_id, log_delta)| {
855 (
856 TableId::new(*table_id),
857 ChangeLogDeltaCommon {
858 new_log: log_delta.new_log.clone().unwrap().into(),
859 truncate_epoch: log_delta.truncate_epoch,
860 },
861 )
862 })
863 .collect(),
864 state_table_info_delta: pb_version_delta
865 .state_table_info_delta
866 .iter()
867 .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
868 .collect(),
869 vector_index_delta: pb_version_delta
870 .vector_index_delta
871 .into_iter()
872 .map(|(table_id, delta)| (TableId::new(table_id), delta.into()))
873 .collect(),
874 }
875 }
876}
877
878#[derive(Debug, PartialEq, Clone)]
879pub struct IntraLevelDeltaCommon<T> {
880 pub level_idx: u32,
881 pub l0_sub_level_id: u64,
882 pub removed_table_ids: HashSet<HummockSstableId>,
883 pub inserted_table_infos: Vec<T>,
884 pub vnode_partition_count: u32,
885 pub compaction_group_version_id: u64,
886}
887
888pub type IntraLevelDelta = IntraLevelDeltaCommon<SstableInfo>;
889
890impl IntraLevelDelta {
891 pub fn estimated_encode_len(&self) -> usize {
892 size_of::<u32>()
893 + size_of::<u64>()
894 + self.removed_table_ids.len() * size_of::<u32>()
895 + self
896 .inserted_table_infos
897 .iter()
898 .map(|sst| sst.estimated_encode_len())
899 .sum::<usize>()
900 + size_of::<u32>()
901 }
902}
903
904impl<T> From<PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
905where
906 T: From<PbSstableInfo>,
907{
908 fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self {
909 Self {
910 level_idx: pb_intra_level_delta.level_idx,
911 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
912 removed_table_ids: HashSet::from_iter(
913 pb_intra_level_delta
914 .removed_table_ids
915 .iter()
916 .map(|sst_id| (*sst_id).into()),
917 ),
918 inserted_table_infos: pb_intra_level_delta
919 .inserted_table_infos
920 .into_iter()
921 .map(Into::into)
922 .collect_vec(),
923 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
924 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
925 }
926 }
927}
928
929impl<T> From<IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
930where
931 PbSstableInfo: From<T>,
932{
933 fn from(intra_level_delta: IntraLevelDeltaCommon<T>) -> Self {
934 Self {
935 level_idx: intra_level_delta.level_idx,
936 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
937 removed_table_ids: intra_level_delta
938 .removed_table_ids
939 .into_iter()
940 .map(|sst_id| sst_id.inner())
941 .collect(),
942 inserted_table_infos: intra_level_delta
943 .inserted_table_infos
944 .into_iter()
945 .map(Into::into)
946 .collect_vec(),
947 vnode_partition_count: intra_level_delta.vnode_partition_count,
948 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
949 }
950 }
951}
952
953impl<T> From<&IntraLevelDeltaCommon<T>> for PbIntraLevelDelta
954where
955 PbSstableInfo: for<'a> From<&'a T>,
956{
957 fn from(intra_level_delta: &IntraLevelDeltaCommon<T>) -> Self {
958 Self {
959 level_idx: intra_level_delta.level_idx,
960 l0_sub_level_id: intra_level_delta.l0_sub_level_id,
961 removed_table_ids: intra_level_delta
962 .removed_table_ids
963 .iter()
964 .map(|sst_id| sst_id.inner())
965 .collect(),
966 inserted_table_infos: intra_level_delta
967 .inserted_table_infos
968 .iter()
969 .map(Into::into)
970 .collect_vec(),
971 vnode_partition_count: intra_level_delta.vnode_partition_count,
972 compaction_group_version_id: intra_level_delta.compaction_group_version_id,
973 }
974 }
975}
976
977impl<T> From<&PbIntraLevelDelta> for IntraLevelDeltaCommon<T>
978where
979 T: for<'a> From<&'a PbSstableInfo>,
980{
981 fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self {
982 Self {
983 level_idx: pb_intra_level_delta.level_idx,
984 l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id,
985 removed_table_ids: HashSet::from_iter(
986 pb_intra_level_delta
987 .removed_table_ids
988 .iter()
989 .map(|sst_id| (*sst_id).into()),
990 ),
991 inserted_table_infos: pb_intra_level_delta
992 .inserted_table_infos
993 .iter()
994 .map(Into::into)
995 .collect_vec(),
996 vnode_partition_count: pb_intra_level_delta.vnode_partition_count,
997 compaction_group_version_id: pb_intra_level_delta.compaction_group_version_id,
998 }
999 }
1000}
1001
1002impl IntraLevelDelta {
1003 pub fn new(
1004 level_idx: u32,
1005 l0_sub_level_id: u64,
1006 removed_table_ids: HashSet<HummockSstableId>,
1007 inserted_table_infos: Vec<SstableInfo>,
1008 vnode_partition_count: u32,
1009 compaction_group_version_id: u64,
1010 ) -> Self {
1011 Self {
1012 level_idx,
1013 l0_sub_level_id,
1014 removed_table_ids,
1015 inserted_table_infos,
1016 vnode_partition_count,
1017 compaction_group_version_id,
1018 }
1019 }
1020}
1021
1022#[derive(Debug, PartialEq, Clone)]
1023pub enum GroupDeltaCommon<T> {
1024 NewL0SubLevel(Vec<T>),
1025 IntraLevel(IntraLevelDeltaCommon<T>),
1026 GroupConstruct(Box<PbGroupConstruct>),
1027 GroupDestroy(PbGroupDestroy),
1028 GroupMerge(PbGroupMerge),
1029 TruncateTables(Vec<u32>),
1030}
1031
1032pub type GroupDelta = GroupDeltaCommon<SstableInfo>;
1033
1034impl<T> From<PbGroupDelta> for GroupDeltaCommon<T>
1035where
1036 T: From<PbSstableInfo>,
1037{
1038 fn from(pb_group_delta: PbGroupDelta) -> Self {
1039 match pb_group_delta.delta_type {
1040 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1041 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1042 }
1043 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1044 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
1045 }
1046 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1047 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
1048 }
1049 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1050 GroupDeltaCommon::GroupMerge(pb_group_merge)
1051 }
1052 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1053 pb_new_sub_level
1054 .inserted_table_infos
1055 .into_iter()
1056 .map(T::from)
1057 .collect(),
1058 ),
1059 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1060 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids)
1061 }
1062
1063 None => panic!("delta_type is not set"),
1064 }
1065 }
1066}
1067
1068impl<T> From<GroupDeltaCommon<T>> for PbGroupDelta
1069where
1070 PbSstableInfo: From<T>,
1071{
1072 fn from(group_delta: GroupDeltaCommon<T>) -> Self {
1073 match group_delta {
1074 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1075 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1076 },
1077 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1078 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
1079 },
1080 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1081 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
1082 },
1083 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1084 delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
1085 },
1086 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1087 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1088 inserted_table_infos: new_sub_level
1089 .into_iter()
1090 .map(PbSstableInfo::from)
1091 .collect(),
1092 })),
1093 },
1094 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1095 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables { table_ids })),
1096 },
1097 }
1098 }
1099}
1100
1101impl<T> From<&GroupDeltaCommon<T>> for PbGroupDelta
1102where
1103 PbSstableInfo: for<'a> From<&'a T>,
1104{
1105 fn from(group_delta: &GroupDeltaCommon<T>) -> Self {
1106 match group_delta {
1107 GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta {
1108 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
1109 },
1110 GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
1111 delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
1112 },
1113 GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
1114 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
1115 },
1116 GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
1117 delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
1118 },
1119 GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta {
1120 delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel {
1121 inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(),
1122 })),
1123 },
1124 GroupDeltaCommon::TruncateTables(table_ids) => PbGroupDelta {
1125 delta_type: Some(PbDeltaType::TruncateTables(PbTruncateTables {
1126 table_ids: table_ids.clone(),
1127 })),
1128 },
1129 }
1130 }
1131}
1132
1133impl<T> From<&PbGroupDelta> for GroupDeltaCommon<T>
1134where
1135 T: for<'a> From<&'a PbSstableInfo>,
1136{
1137 fn from(pb_group_delta: &PbGroupDelta) -> Self {
1138 match &pb_group_delta.delta_type {
1139 Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => {
1140 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
1141 }
1142 Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
1143 GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
1144 }
1145 Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
1146 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
1147 }
1148 Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
1149 GroupDeltaCommon::GroupMerge(*pb_group_merge)
1150 }
1151 Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel(
1152 pb_new_sub_level
1153 .inserted_table_infos
1154 .iter()
1155 .map(T::from)
1156 .collect(),
1157 ),
1158 Some(PbDeltaType::TruncateTables(pb_truncate_tables)) => {
1159 GroupDeltaCommon::TruncateTables(pb_truncate_tables.table_ids.clone())
1160 }
1161 None => panic!("delta_type is not set"),
1162 }
1163 }
1164}
1165
1166#[derive(Debug, PartialEq, Clone)]
1167pub struct GroupDeltasCommon<T> {
1168 pub group_deltas: Vec<GroupDeltaCommon<T>>,
1169}
1170
1171impl<T> Default for GroupDeltasCommon<T> {
1172 fn default() -> Self {
1173 Self {
1174 group_deltas: vec![],
1175 }
1176 }
1177}
1178
1179pub type GroupDeltas = GroupDeltasCommon<SstableInfo>;
1180
1181impl<T> From<PbGroupDeltas> for GroupDeltasCommon<T>
1182where
1183 T: From<PbSstableInfo>,
1184{
1185 fn from(pb_group_deltas: PbGroupDeltas) -> Self {
1186 Self {
1187 group_deltas: pb_group_deltas
1188 .group_deltas
1189 .into_iter()
1190 .map(GroupDeltaCommon::from)
1191 .collect_vec(),
1192 }
1193 }
1194}
1195
1196impl<T> From<GroupDeltasCommon<T>> for PbGroupDeltas
1197where
1198 PbSstableInfo: From<T>,
1199{
1200 fn from(group_deltas: GroupDeltasCommon<T>) -> Self {
1201 Self {
1202 group_deltas: group_deltas
1203 .group_deltas
1204 .into_iter()
1205 .map(|group_delta| group_delta.into())
1206 .collect_vec(),
1207 }
1208 }
1209}
1210
1211impl<T> From<&GroupDeltasCommon<T>> for PbGroupDeltas
1212where
1213 PbSstableInfo: for<'a> From<&'a T>,
1214{
1215 fn from(group_deltas: &GroupDeltasCommon<T>) -> Self {
1216 Self {
1217 group_deltas: group_deltas
1218 .group_deltas
1219 .iter()
1220 .map(|group_delta| group_delta.into())
1221 .collect_vec(),
1222 }
1223 }
1224}
1225
1226impl<T> From<&PbGroupDeltas> for GroupDeltasCommon<T>
1227where
1228 T: for<'a> From<&'a PbSstableInfo>,
1229{
1230 fn from(pb_group_deltas: &PbGroupDeltas) -> Self {
1231 Self {
1232 group_deltas: pb_group_deltas
1233 .group_deltas
1234 .iter()
1235 .map(GroupDeltaCommon::from)
1236 .collect_vec(),
1237 }
1238 }
1239}
1240
1241impl<T> GroupDeltasCommon<T>
1242where
1243 PbSstableInfo: for<'a> From<&'a T>,
1244{
1245 pub fn to_protobuf(&self) -> PbGroupDeltas {
1246 self.into()
1247 }
1248}
1249
1250impl From<HummockVersionDelta> for LocalHummockVersionDelta {
1251 #[expect(deprecated)]
1252 fn from(delta: HummockVersionDelta) -> Self {
1253 Self {
1254 id: delta.id,
1255 prev_id: delta.prev_id,
1256 group_deltas: delta.group_deltas,
1257 max_committed_epoch: delta.max_committed_epoch,
1258 trivial_move: delta.trivial_move,
1259 new_table_watermarks: delta.new_table_watermarks,
1260 removed_table_ids: delta.removed_table_ids,
1261 change_log_delta: delta
1262 .change_log_delta
1263 .into_iter()
1264 .map(|(k, v)| {
1265 (
1266 k,
1267 ChangeLogDeltaCommon {
1268 truncate_epoch: v.truncate_epoch,
1269 new_log: EpochNewChangeLogCommon {
1270 new_value: Vec::new(),
1271 old_value: Vec::new(),
1272 non_checkpoint_epochs: v.new_log.non_checkpoint_epochs,
1273 checkpoint_epoch: v.new_log.checkpoint_epoch,
1274 },
1275 },
1276 )
1277 })
1278 .collect(),
1279 state_table_info_delta: delta.state_table_info_delta,
1280 vector_index_delta: delta.vector_index_delta,
1281 }
1282 }
1283}
1284
1285impl From<HummockVersion> for LocalHummockVersion {
1286 #[expect(deprecated)]
1287 fn from(version: HummockVersion) -> Self {
1288 Self {
1289 id: version.id,
1290 levels: version.levels,
1291 max_committed_epoch: version.max_committed_epoch,
1292 table_watermarks: version.table_watermarks,
1293 table_change_log: version
1294 .table_change_log
1295 .into_iter()
1296 .map(|(k, v)| {
1297 let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
1298 .change_log_into_iter()
1299 .map(|epoch_new_change_log| EpochNewChangeLogCommon {
1300 new_value: Vec::new(),
1301 old_value: Vec::new(),
1302 non_checkpoint_epochs: epoch_new_change_log.non_checkpoint_epochs,
1303 checkpoint_epoch: epoch_new_change_log.checkpoint_epoch,
1304 })
1305 .collect();
1306 (k, TableChangeLogCommon::new(epoch_new_change_logs))
1307 })
1308 .collect(),
1309 state_table_info: version.state_table_info,
1310 vector_indexes: version.vector_indexes,
1311 }
1312 }
1313}