1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21 PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22 StateTableInfoDelta,
23};
24
25use crate::change_log::{
26 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon, resolve_pb_log_epochs,
27};
28use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
29use crate::{HummockVersionId, INVALID_VERSION_ID};
30
31#[derive(Clone, Debug)]
32pub struct FrontendHummockVersion {
33 pub id: HummockVersionId,
34 pub state_table_info: HummockVersionStateTableInfo,
35 pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
36}
37
38impl FrontendHummockVersion {
39 pub fn from_version(version: &HummockVersion) -> Self {
40 Self {
41 id: version.id,
42 state_table_info: version.state_table_info.clone(),
43 table_change_log: version
44 .table_change_log
45 .iter()
46 .map(|(table_id, change_log)| {
47 (
48 *table_id,
49 TableChangeLogCommon::new(change_log.iter().map(|change_log| {
50 EpochNewChangeLogCommon {
51 new_value: vec![(); change_log.new_value.len()],
52 old_value: vec![(); change_log.new_value.len()],
53 non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
54 checkpoint_epoch: change_log.checkpoint_epoch,
55 }
56 })),
57 )
58 })
59 .collect(),
60 }
61 }
62
63 pub fn to_protobuf(&self) -> PbHummockVersion {
64 #[expect(deprecated)]
65 PbHummockVersion {
66 id: self.id.0,
67 levels: Default::default(),
68 max_committed_epoch: INVALID_EPOCH,
69 table_watermarks: Default::default(),
70 table_change_logs: self
71 .table_change_log
72 .iter()
73 .map(|(table_id, change_log)| {
74 (
75 table_id.table_id,
76 PbTableChangeLog {
77 change_logs: change_log
78 .iter()
79 .map(|change_log| PbEpochNewChangeLog {
80 old_value: vec![
81 PbSstableInfo::default();
82 change_log.old_value.len()
83 ],
84 new_value: vec![
85 PbSstableInfo::default();
86 change_log.new_value.len()
87 ],
88 epochs: change_log.epochs().collect(),
89 })
90 .collect(),
91 },
92 )
93 })
94 .collect(),
95 state_table_info: self.state_table_info.to_protobuf(),
96 }
97 }
98
99 pub fn from_protobuf(value: PbHummockVersion) -> Self {
100 Self {
101 id: HummockVersionId(value.id),
102 state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
103 table_change_log: value
104 .table_change_logs
105 .into_iter()
106 .map(|(table_id, change_log)| {
107 (
108 TableId::new(table_id),
109 TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
110 |change_log| {
111 let (non_checkpoint_epochs, checkpoint_epoch) =
112 resolve_pb_log_epochs(&change_log.epochs);
113 EpochNewChangeLogCommon {
114 new_value: vec![(); change_log.new_value.len()],
116 old_value: vec![(); change_log.old_value.len()],
117 non_checkpoint_epochs,
118 checkpoint_epoch,
119 }
120 },
121 )),
122 )
123 })
124 .collect(),
125 }
126 }
127
128 pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
129 if self.id != INVALID_VERSION_ID {
130 assert_eq!(self.id, delta.prev_id);
131 }
132 self.id = delta.id;
133 let (changed_table_info, _) = self
134 .state_table_info
135 .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
136 HummockVersion::apply_change_log_delta(
137 &mut self.table_change_log,
138 &delta.change_log_delta,
139 &delta.removed_table_id,
140 &delta.state_table_info_delta,
141 &changed_table_info,
142 );
143 }
144}
145
146pub struct FrontendHummockVersionDelta {
147 pub prev_id: HummockVersionId,
148 pub id: HummockVersionId,
149 pub removed_table_id: HashSet<TableId>,
150 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
151 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
152}
153
154impl FrontendHummockVersionDelta {
155 pub fn from_delta(delta: &HummockVersionDelta) -> Self {
156 Self {
157 prev_id: delta.prev_id,
158 id: delta.id,
159 removed_table_id: delta.removed_table_ids.clone(),
160 state_table_info_delta: delta.state_table_info_delta.clone(),
161 change_log_delta: delta
162 .change_log_delta
163 .iter()
164 .map(|(table_id, change_log_delta)| {
165 (
166 *table_id,
167 ChangeLogDeltaCommon {
168 truncate_epoch: change_log_delta.truncate_epoch,
169 new_log: EpochNewChangeLogCommon {
170 new_value: vec![(); change_log_delta.new_log.new_value.len()],
172 old_value: vec![(); change_log_delta.new_log.old_value.len()],
173 non_checkpoint_epochs: change_log_delta
174 .new_log
175 .non_checkpoint_epochs
176 .clone(),
177 checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
178 },
179 },
180 )
181 })
182 .collect(),
183 }
184 }
185
186 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
187 #[expect(deprecated)]
188 PbHummockVersionDelta {
189 id: self.id.to_u64(),
190 prev_id: self.prev_id.to_u64(),
191 group_deltas: Default::default(),
192 max_committed_epoch: INVALID_EPOCH,
193 trivial_move: false,
194 new_table_watermarks: Default::default(),
195 removed_table_ids: self
196 .removed_table_id
197 .iter()
198 .map(|table_id| table_id.table_id)
199 .collect(),
200 change_log_delta: self
201 .change_log_delta
202 .iter()
203 .map(|(table_id, delta)| {
204 (
205 table_id.table_id,
206 PbChangeLogDelta {
207 new_log: Some(PbEpochNewChangeLog {
208 old_value: vec![
210 PbSstableInfo::default();
211 delta.new_log.old_value.len()
212 ],
213 new_value: vec![
214 PbSstableInfo::default();
215 delta.new_log.new_value.len()
216 ],
217 epochs: delta.new_log.epochs().collect(),
218 }),
219 truncate_epoch: delta.truncate_epoch,
220 },
221 )
222 })
223 .collect(),
224 state_table_info_delta: self
225 .state_table_info_delta
226 .iter()
227 .map(|(table_id, delta)| (table_id.table_id, *delta))
228 .collect(),
229 }
230 }
231
232 pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
233 Self {
234 prev_id: HummockVersionId::new(delta.prev_id),
235 id: HummockVersionId::new(delta.id),
236 removed_table_id: delta
237 .removed_table_ids
238 .iter()
239 .map(|table_id| TableId::new(*table_id))
240 .collect(),
241 state_table_info_delta: delta
242 .state_table_info_delta
243 .into_iter()
244 .map(|(table_id, delta)| (TableId::new(table_id), delta))
245 .collect(),
246 change_log_delta: delta
247 .change_log_delta
248 .iter()
249 .map(|(table_id, change_log_delta)| {
250 (
251 TableId::new(*table_id),
252 ChangeLogDeltaCommon {
253 truncate_epoch: change_log_delta.truncate_epoch,
254 new_log: change_log_delta
255 .new_log
256 .as_ref()
257 .map(|new_log| {
258 let (non_checkpoint_epochs, checkpoint_epoch) =
259 resolve_pb_log_epochs(&new_log.epochs);
260 EpochNewChangeLogCommon {
261 new_value: vec![(); new_log.new_value.len()],
263 old_value: vec![(); new_log.old_value.len()],
264 non_checkpoint_epochs,
265 checkpoint_epoch,
266 }
267 })
268 .unwrap(),
269 },
270 )
271 })
272 .collect(),
273 }
274 }
275}