1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21 PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22 StateTableInfoDelta,
23};
24
25use crate::change_log::{
26 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon, resolve_pb_log_epochs,
27};
28use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
29use crate::{HummockVersionId, INVALID_VERSION_ID};
30
31#[derive(Clone, Debug)]
32pub struct FrontendHummockVersion {
33 pub id: HummockVersionId,
34 pub state_table_info: HummockVersionStateTableInfo,
35 pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
36}
37
38impl FrontendHummockVersion {
39 pub fn from_version(version: &HummockVersion) -> Self {
40 Self {
41 id: version.id,
42 state_table_info: version.state_table_info.clone(),
43 table_change_log: version
44 .table_change_log
45 .iter()
46 .map(|(table_id, change_log)| {
47 (
48 *table_id,
49 TableChangeLogCommon::new(change_log.iter().map(|change_log| {
50 EpochNewChangeLogCommon {
51 new_value: vec![(); change_log.new_value.len()],
52 old_value: vec![(); change_log.new_value.len()],
53 non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
54 checkpoint_epoch: change_log.checkpoint_epoch,
55 }
56 })),
57 )
58 })
59 .collect(),
60 }
61 }
62
63 pub fn to_protobuf(&self) -> PbHummockVersion {
64 #[expect(deprecated)]
65 PbHummockVersion {
66 id: self.id.0,
67 levels: Default::default(),
68 max_committed_epoch: INVALID_EPOCH,
69 table_watermarks: Default::default(),
70 table_change_logs: self
71 .table_change_log
72 .iter()
73 .map(|(table_id, change_log)| {
74 (
75 table_id.table_id,
76 PbTableChangeLog {
77 change_logs: change_log
78 .iter()
79 .map(|change_log| PbEpochNewChangeLog {
80 old_value: vec![
81 PbSstableInfo::default();
82 change_log.old_value.len()
83 ],
84 new_value: vec![
85 PbSstableInfo::default();
86 change_log.new_value.len()
87 ],
88 epochs: change_log.epochs().collect(),
89 })
90 .collect(),
91 },
92 )
93 })
94 .collect(),
95 state_table_info: self.state_table_info.to_protobuf(),
96 vector_indexes: Default::default(),
97 }
98 }
99
100 pub fn from_protobuf(value: PbHummockVersion) -> Self {
101 Self {
102 id: HummockVersionId(value.id),
103 state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
104 table_change_log: value
105 .table_change_logs
106 .into_iter()
107 .map(|(table_id, change_log)| {
108 (
109 TableId::new(table_id),
110 TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
111 |change_log| {
112 let (non_checkpoint_epochs, checkpoint_epoch) =
113 resolve_pb_log_epochs(&change_log.epochs);
114 EpochNewChangeLogCommon {
115 new_value: vec![(); change_log.new_value.len()],
117 old_value: vec![(); change_log.old_value.len()],
118 non_checkpoint_epochs,
119 checkpoint_epoch,
120 }
121 },
122 )),
123 )
124 })
125 .collect(),
126 }
127 }
128
129 pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
130 if self.id != INVALID_VERSION_ID {
131 assert_eq!(self.id, delta.prev_id);
132 }
133 self.id = delta.id;
134 let (changed_table_info, _) = self
135 .state_table_info
136 .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
137 HummockVersion::apply_change_log_delta(
138 &mut self.table_change_log,
139 &delta.change_log_delta,
140 &delta.removed_table_id,
141 &delta.state_table_info_delta,
142 &changed_table_info,
143 );
144 }
145}
146
147pub struct FrontendHummockVersionDelta {
148 pub prev_id: HummockVersionId,
149 pub id: HummockVersionId,
150 pub removed_table_id: HashSet<TableId>,
151 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
152 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
153}
154
155impl FrontendHummockVersionDelta {
156 pub fn from_delta(delta: &HummockVersionDelta) -> Self {
157 Self {
158 prev_id: delta.prev_id,
159 id: delta.id,
160 removed_table_id: delta.removed_table_ids.clone(),
161 state_table_info_delta: delta.state_table_info_delta.clone(),
162 change_log_delta: delta
163 .change_log_delta
164 .iter()
165 .map(|(table_id, change_log_delta)| {
166 (
167 *table_id,
168 ChangeLogDeltaCommon {
169 truncate_epoch: change_log_delta.truncate_epoch,
170 new_log: EpochNewChangeLogCommon {
171 new_value: vec![(); change_log_delta.new_log.new_value.len()],
173 old_value: vec![(); change_log_delta.new_log.old_value.len()],
174 non_checkpoint_epochs: change_log_delta
175 .new_log
176 .non_checkpoint_epochs
177 .clone(),
178 checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
179 },
180 },
181 )
182 })
183 .collect(),
184 }
185 }
186
187 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
188 #[expect(deprecated)]
189 PbHummockVersionDelta {
190 id: self.id.to_u64(),
191 prev_id: self.prev_id.to_u64(),
192 group_deltas: Default::default(),
193 max_committed_epoch: INVALID_EPOCH,
194 trivial_move: false,
195 new_table_watermarks: Default::default(),
196 removed_table_ids: self
197 .removed_table_id
198 .iter()
199 .map(|table_id| table_id.table_id)
200 .collect(),
201 change_log_delta: self
202 .change_log_delta
203 .iter()
204 .map(|(table_id, delta)| {
205 (
206 table_id.table_id,
207 PbChangeLogDelta {
208 new_log: Some(PbEpochNewChangeLog {
209 old_value: vec![
211 PbSstableInfo::default();
212 delta.new_log.old_value.len()
213 ],
214 new_value: vec![
215 PbSstableInfo::default();
216 delta.new_log.new_value.len()
217 ],
218 epochs: delta.new_log.epochs().collect(),
219 }),
220 truncate_epoch: delta.truncate_epoch,
221 },
222 )
223 })
224 .collect(),
225 state_table_info_delta: self
226 .state_table_info_delta
227 .iter()
228 .map(|(table_id, delta)| (table_id.table_id, *delta))
229 .collect(),
230 vector_index_delta: Default::default(),
231 }
232 }
233
234 pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
235 Self {
236 prev_id: HummockVersionId::new(delta.prev_id),
237 id: HummockVersionId::new(delta.id),
238 removed_table_id: delta
239 .removed_table_ids
240 .iter()
241 .map(|table_id| TableId::new(*table_id))
242 .collect(),
243 state_table_info_delta: delta
244 .state_table_info_delta
245 .into_iter()
246 .map(|(table_id, delta)| (TableId::new(table_id), delta))
247 .collect(),
248 change_log_delta: delta
249 .change_log_delta
250 .iter()
251 .map(|(table_id, change_log_delta)| {
252 (
253 TableId::new(*table_id),
254 ChangeLogDeltaCommon {
255 truncate_epoch: change_log_delta.truncate_epoch,
256 new_log: change_log_delta
257 .new_log
258 .as_ref()
259 .map(|new_log| {
260 let (non_checkpoint_epochs, checkpoint_epoch) =
261 resolve_pb_log_epochs(&new_log.epochs);
262 EpochNewChangeLogCommon {
263 new_value: vec![(); new_log.new_value.len()],
265 old_value: vec![(); new_log.old_value.len()],
266 non_checkpoint_epochs,
267 checkpoint_epoch,
268 }
269 })
270 .unwrap(),
271 },
272 )
273 })
274 .collect(),
275 }
276 }
277}