1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21 PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22 StateTableInfoDelta,
23};
24
25use crate::change_log::{
26 ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon, resolve_pb_log_epochs,
27};
28use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
29use crate::{HummockVersionId, INVALID_VERSION_ID};
30
31#[derive(Clone, Debug)]
32pub struct FrontendHummockVersion {
33 pub id: HummockVersionId,
34 pub state_table_info: HummockVersionStateTableInfo,
35 pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
36}
37
38impl FrontendHummockVersion {
39 pub fn from_version(version: &HummockVersion) -> Self {
40 Self {
41 id: version.id,
42 state_table_info: version.state_table_info.clone(),
43 table_change_log: version
44 .table_change_log
45 .iter()
46 .map(|(table_id, change_log)| {
47 (
48 *table_id,
49 TableChangeLogCommon::new(change_log.iter().map(|change_log| {
50 EpochNewChangeLogCommon {
51 new_value: vec![(); change_log.new_value.len()],
52 old_value: vec![(); change_log.new_value.len()],
53 non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
54 checkpoint_epoch: change_log.checkpoint_epoch,
55 }
56 })),
57 )
58 })
59 .collect(),
60 }
61 }
62
63 pub fn to_protobuf(&self) -> PbHummockVersion {
64 #[expect(deprecated)]
65 PbHummockVersion {
66 id: self.id.0,
67 levels: Default::default(),
68 max_committed_epoch: INVALID_EPOCH,
69 table_watermarks: Default::default(),
70 table_change_logs: self
71 .table_change_log
72 .iter()
73 .map(|(table_id, change_log)| {
74 (
75 *table_id,
76 PbTableChangeLog {
77 change_logs: change_log
78 .iter()
79 .map(|change_log| PbEpochNewChangeLog {
80 old_value: vec![
81 PbSstableInfo::default();
82 change_log.old_value.len()
83 ],
84 new_value: vec![
85 PbSstableInfo::default();
86 change_log.new_value.len()
87 ],
88 epochs: change_log.epochs().collect(),
89 })
90 .collect(),
91 },
92 )
93 })
94 .collect(),
95 state_table_info: self.state_table_info.info().clone(),
96 vector_indexes: Default::default(),
97 }
98 }
99
100 pub fn from_protobuf(value: PbHummockVersion) -> Self {
101 Self {
102 id: HummockVersionId(value.id),
103 state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
104 table_change_log: value
105 .table_change_logs
106 .into_iter()
107 .map(|(table_id, change_log)| {
108 (
109 table_id,
110 TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
111 |change_log| {
112 let (non_checkpoint_epochs, checkpoint_epoch) =
113 resolve_pb_log_epochs(&change_log.epochs);
114 EpochNewChangeLogCommon {
115 new_value: vec![(); change_log.new_value.len()],
117 old_value: vec![(); change_log.old_value.len()],
118 non_checkpoint_epochs,
119 checkpoint_epoch,
120 }
121 },
122 )),
123 )
124 })
125 .collect(),
126 }
127 }
128
129 pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
130 if self.id != INVALID_VERSION_ID {
131 assert_eq!(self.id, delta.prev_id);
132 }
133 self.id = delta.id;
134 let (changed_table_info, _) = self
135 .state_table_info
136 .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
137 HummockVersion::apply_change_log_delta(
138 &mut self.table_change_log,
139 &delta.change_log_delta,
140 &delta.removed_table_id,
141 &delta.state_table_info_delta,
142 &changed_table_info,
143 );
144 }
145}
146
147pub struct FrontendHummockVersionDelta {
148 pub prev_id: HummockVersionId,
149 pub id: HummockVersionId,
150 pub removed_table_id: HashSet<TableId>,
151 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
152 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
153}
154
155impl FrontendHummockVersionDelta {
156 pub fn from_delta(delta: &HummockVersionDelta) -> Self {
157 Self {
158 prev_id: delta.prev_id,
159 id: delta.id,
160 removed_table_id: delta.removed_table_ids.clone(),
161 state_table_info_delta: delta.state_table_info_delta.clone(),
162 change_log_delta: delta
163 .change_log_delta
164 .iter()
165 .map(|(table_id, change_log_delta)| {
166 (
167 *table_id,
168 ChangeLogDeltaCommon {
169 truncate_epoch: change_log_delta.truncate_epoch,
170 new_log: EpochNewChangeLogCommon {
171 new_value: vec![(); change_log_delta.new_log.new_value.len()],
173 old_value: vec![(); change_log_delta.new_log.old_value.len()],
174 non_checkpoint_epochs: change_log_delta
175 .new_log
176 .non_checkpoint_epochs
177 .clone(),
178 checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
179 },
180 },
181 )
182 })
183 .collect(),
184 }
185 }
186
187 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
188 #[expect(deprecated)]
189 PbHummockVersionDelta {
190 id: self.id.to_u64(),
191 prev_id: self.prev_id.to_u64(),
192 group_deltas: Default::default(),
193 max_committed_epoch: INVALID_EPOCH,
194 trivial_move: false,
195 new_table_watermarks: Default::default(),
196 removed_table_ids: self.removed_table_id.iter().copied().collect(),
197 change_log_delta: self
198 .change_log_delta
199 .iter()
200 .map(|(table_id, delta)| {
201 (
202 *table_id,
203 PbChangeLogDelta {
204 new_log: Some(PbEpochNewChangeLog {
205 old_value: vec![
207 PbSstableInfo::default();
208 delta.new_log.old_value.len()
209 ],
210 new_value: vec![
211 PbSstableInfo::default();
212 delta.new_log.new_value.len()
213 ],
214 epochs: delta.new_log.epochs().collect(),
215 }),
216 truncate_epoch: delta.truncate_epoch,
217 },
218 )
219 })
220 .collect(),
221 state_table_info_delta: self
222 .state_table_info_delta
223 .iter()
224 .map(|(table_id, delta)| (*table_id, *delta))
225 .collect(),
226 vector_index_delta: Default::default(),
227 }
228 }
229
230 pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
231 Self {
232 prev_id: HummockVersionId::new(delta.prev_id),
233 id: HummockVersionId::new(delta.id),
234 removed_table_id: delta.removed_table_ids.into_iter().collect(),
235 state_table_info_delta: delta.state_table_info_delta.clone(),
236 change_log_delta: delta
237 .change_log_delta
238 .iter()
239 .map(|(table_id, change_log_delta)| {
240 (
241 *table_id,
242 ChangeLogDeltaCommon {
243 truncate_epoch: change_log_delta.truncate_epoch,
244 new_log: change_log_delta
245 .new_log
246 .as_ref()
247 .map(|new_log| {
248 let (non_checkpoint_epochs, checkpoint_epoch) =
249 resolve_pb_log_epochs(&new_log.epochs);
250 EpochNewChangeLogCommon {
251 new_value: vec![(); new_log.new_value.len()],
253 old_value: vec![(); new_log.old_value.len()],
254 non_checkpoint_epochs,
255 checkpoint_epoch,
256 }
257 })
258 .unwrap(),
259 },
260 )
261 })
262 .collect(),
263 }
264 }
265}