risingwave_hummock_sdk/
frontend_version.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21 PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo,
22 StateTableInfoDelta,
23};
24
25use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, resolve_pb_log_epochs};
26use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
27use crate::{HummockVersionId, INVALID_VERSION_ID};
28
29#[derive(Clone, Debug)]
30pub struct FrontendHummockVersion {
31 pub id: HummockVersionId,
32 pub state_table_info: HummockVersionStateTableInfo,
33}
34
35impl FrontendHummockVersion {
36 pub fn from_version(version: &HummockVersion) -> Self {
37 Self {
38 id: version.id,
39 state_table_info: version.state_table_info.clone(),
40 }
41 }
42
43 pub fn to_protobuf(&self) -> PbHummockVersion {
44 #[expect(deprecated)]
45 PbHummockVersion {
46 id: self.id,
47 levels: Default::default(),
48 max_committed_epoch: INVALID_EPOCH,
49 table_watermarks: Default::default(),
50 table_change_logs: Default::default(),
51 state_table_info: self.state_table_info.info().clone(),
52 vector_indexes: Default::default(),
53 }
54 }
55
56 pub fn from_protobuf(value: PbHummockVersion) -> Self {
57 Self {
58 id: value.id,
59 state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
60 }
61 }
62
63 pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
64 if self.id != INVALID_VERSION_ID {
65 assert_eq!(self.id, delta.prev_id);
66 }
67 self.id = delta.id;
68 self.state_table_info
69 .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
70 }
71}
72
73pub struct FrontendHummockVersionDelta {
74 pub prev_id: HummockVersionId,
75 pub id: HummockVersionId,
76 pub removed_table_id: HashSet<TableId>,
77 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
78 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
79}
80
81impl FrontendHummockVersionDelta {
82 pub fn from_delta(delta: &HummockVersionDelta) -> Self {
83 Self {
84 prev_id: delta.prev_id,
85 id: delta.id,
86 removed_table_id: delta.removed_table_ids.clone(),
87 state_table_info_delta: delta.state_table_info_delta.clone(),
88 change_log_delta: delta
89 .change_log_delta
90 .iter()
91 .map(|(table_id, change_log_delta)| {
92 (
93 *table_id,
94 ChangeLogDeltaCommon {
95 truncate_epoch: change_log_delta.truncate_epoch,
96 new_log: EpochNewChangeLogCommon {
97 new_value: vec![(); change_log_delta.new_log.new_value.len()],
99 old_value: vec![(); change_log_delta.new_log.old_value.len()],
100 non_checkpoint_epochs: change_log_delta
101 .new_log
102 .non_checkpoint_epochs
103 .clone(),
104 checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
105 },
106 },
107 )
108 })
109 .collect(),
110 }
111 }
112
113 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
114 #[expect(deprecated)]
115 PbHummockVersionDelta {
116 id: self.id,
117 prev_id: self.prev_id,
118 group_deltas: Default::default(),
119 max_committed_epoch: INVALID_EPOCH,
120 trivial_move: false,
121 new_table_watermarks: Default::default(),
122 removed_table_ids: self.removed_table_id.iter().copied().collect(),
123 change_log_delta: self
124 .change_log_delta
125 .iter()
126 .map(|(table_id, delta)| {
127 (
128 *table_id,
129 PbChangeLogDelta {
130 new_log: Some(PbEpochNewChangeLog {
131 old_value: vec![
133 PbSstableInfo::default();
134 delta.new_log.old_value.len()
135 ],
136 new_value: vec![
137 PbSstableInfo::default();
138 delta.new_log.new_value.len()
139 ],
140 epochs: delta.new_log.epochs().collect(),
141 }),
142 truncate_epoch: delta.truncate_epoch,
143 },
144 )
145 })
146 .collect(),
147 state_table_info_delta: self.state_table_info_delta.clone(),
148 vector_index_delta: Default::default(),
149 }
150 }
151
152 pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
153 Self {
154 prev_id: delta.prev_id,
155 id: delta.id,
156 removed_table_id: delta.removed_table_ids.into_iter().collect(),
157 state_table_info_delta: delta
158 .state_table_info_delta
159 .iter()
160 .map(|(table_id, delta)| ((*table_id), *delta))
161 .collect(),
162 change_log_delta: delta
163 .change_log_delta
164 .iter()
165 .map(|(table_id, change_log_delta)| {
166 (
167 *table_id,
168 ChangeLogDeltaCommon {
169 truncate_epoch: change_log_delta.truncate_epoch,
170 new_log: change_log_delta
171 .new_log
172 .as_ref()
173 .map(|new_log| {
174 let (non_checkpoint_epochs, checkpoint_epoch) =
175 resolve_pb_log_epochs(&new_log.epochs);
176 EpochNewChangeLogCommon {
177 new_value: vec![(); new_log.new_value.len()],
179 old_value: vec![(); new_log.old_value.len()],
180 non_checkpoint_epochs,
181 checkpoint_epoch,
182 }
183 })
184 .unwrap(),
185 },
186 )
187 })
188 .collect(),
189 }
190 }
191}