1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21 PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22 StateTableInfoDelta,
23};
24
25use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon};
26use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
27use crate::{HummockVersionId, INVALID_VERSION_ID};
28
29#[derive(Clone, Debug)]
30pub struct FrontendHummockVersion {
31 pub id: HummockVersionId,
32 pub state_table_info: HummockVersionStateTableInfo,
33 pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
34}
35
36impl FrontendHummockVersion {
37 pub fn from_version(version: &HummockVersion) -> Self {
38 Self {
39 id: version.id,
40 state_table_info: version.state_table_info.clone(),
41 table_change_log: version
42 .table_change_log
43 .iter()
44 .map(|(table_id, change_log)| {
45 (
46 *table_id,
47 TableChangeLogCommon::new(change_log.iter().map(|change_log| {
48 EpochNewChangeLogCommon {
49 new_value: vec![(); change_log.new_value.len()],
50 old_value: vec![(); change_log.new_value.len()],
51 epochs: change_log.epochs.clone(),
52 }
53 })),
54 )
55 })
56 .collect(),
57 }
58 }
59
60 pub fn to_protobuf(&self) -> PbHummockVersion {
61 #[expect(deprecated)]
62 PbHummockVersion {
63 id: self.id.0,
64 levels: Default::default(),
65 max_committed_epoch: INVALID_EPOCH,
66 table_watermarks: Default::default(),
67 table_change_logs: self
68 .table_change_log
69 .iter()
70 .map(|(table_id, change_log)| {
71 (
72 table_id.table_id,
73 PbTableChangeLog {
74 change_logs: change_log
75 .iter()
76 .map(|change_log| PbEpochNewChangeLog {
77 old_value: vec![
78 PbSstableInfo::default();
79 change_log.old_value.len()
80 ],
81 new_value: vec![
82 PbSstableInfo::default();
83 change_log.new_value.len()
84 ],
85 epochs: change_log.epochs.clone(),
86 })
87 .collect(),
88 },
89 )
90 })
91 .collect(),
92 state_table_info: self.state_table_info.to_protobuf(),
93 }
94 }
95
96 pub fn from_protobuf(value: PbHummockVersion) -> Self {
97 Self {
98 id: HummockVersionId(value.id),
99 state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
100 table_change_log: value
101 .table_change_logs
102 .into_iter()
103 .map(|(table_id, change_log)| {
104 (
105 TableId::new(table_id),
106 TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
107 |change_log| EpochNewChangeLogCommon {
108 new_value: vec![(); change_log.new_value.len()],
110 old_value: vec![(); change_log.old_value.len()],
111 epochs: change_log.epochs,
112 },
113 )),
114 )
115 })
116 .collect(),
117 }
118 }
119
120 pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
121 if self.id != INVALID_VERSION_ID {
122 assert_eq!(self.id, delta.prev_id);
123 }
124 self.id = delta.id;
125 let (changed_table_info, _) = self
126 .state_table_info
127 .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
128 HummockVersion::apply_change_log_delta(
129 &mut self.table_change_log,
130 &delta.change_log_delta,
131 &delta.removed_table_id,
132 &delta.state_table_info_delta,
133 &changed_table_info,
134 );
135 }
136}
137
138pub struct FrontendHummockVersionDelta {
139 pub prev_id: HummockVersionId,
140 pub id: HummockVersionId,
141 pub removed_table_id: HashSet<TableId>,
142 pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
143 pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
144}
145
146impl FrontendHummockVersionDelta {
147 pub fn from_delta(delta: &HummockVersionDelta) -> Self {
148 Self {
149 prev_id: delta.prev_id,
150 id: delta.id,
151 removed_table_id: delta.removed_table_ids.clone(),
152 state_table_info_delta: delta.state_table_info_delta.clone(),
153 change_log_delta: delta
154 .change_log_delta
155 .iter()
156 .map(|(table_id, change_log_delta)| {
157 (
158 *table_id,
159 ChangeLogDeltaCommon {
160 truncate_epoch: change_log_delta.truncate_epoch,
161 new_log: EpochNewChangeLogCommon {
162 new_value: vec![(); change_log_delta.new_log.new_value.len()],
164 old_value: vec![(); change_log_delta.new_log.old_value.len()],
165 epochs: change_log_delta.new_log.epochs.clone(),
166 },
167 },
168 )
169 })
170 .collect(),
171 }
172 }
173
174 pub fn to_protobuf(&self) -> PbHummockVersionDelta {
175 #[expect(deprecated)]
176 PbHummockVersionDelta {
177 id: self.id.to_u64(),
178 prev_id: self.prev_id.to_u64(),
179 group_deltas: Default::default(),
180 max_committed_epoch: INVALID_EPOCH,
181 trivial_move: false,
182 new_table_watermarks: Default::default(),
183 removed_table_ids: self
184 .removed_table_id
185 .iter()
186 .map(|table_id| table_id.table_id)
187 .collect(),
188 change_log_delta: self
189 .change_log_delta
190 .iter()
191 .map(|(table_id, delta)| {
192 (
193 table_id.table_id,
194 PbChangeLogDelta {
195 new_log: Some(PbEpochNewChangeLog {
196 old_value: vec![
198 PbSstableInfo::default();
199 delta.new_log.old_value.len()
200 ],
201 new_value: vec![
202 PbSstableInfo::default();
203 delta.new_log.new_value.len()
204 ],
205 epochs: delta.new_log.epochs.clone(),
206 }),
207 truncate_epoch: delta.truncate_epoch,
208 },
209 )
210 })
211 .collect(),
212 state_table_info_delta: self
213 .state_table_info_delta
214 .iter()
215 .map(|(table_id, delta)| (table_id.table_id, *delta))
216 .collect(),
217 }
218 }
219
220 pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
221 Self {
222 prev_id: HummockVersionId::new(delta.prev_id),
223 id: HummockVersionId::new(delta.id),
224 removed_table_id: delta
225 .removed_table_ids
226 .iter()
227 .map(|table_id| TableId::new(*table_id))
228 .collect(),
229 state_table_info_delta: delta
230 .state_table_info_delta
231 .into_iter()
232 .map(|(table_id, delta)| (TableId::new(table_id), delta))
233 .collect(),
234 change_log_delta: delta
235 .change_log_delta
236 .iter()
237 .map(|(table_id, change_log_delta)| {
238 (
239 TableId::new(*table_id),
240 ChangeLogDeltaCommon {
241 truncate_epoch: change_log_delta.truncate_epoch,
242 new_log: change_log_delta
243 .new_log
244 .as_ref()
245 .map(|new_log| {
246 EpochNewChangeLogCommon {
247 new_value: vec![(); new_log.new_value.len()],
249 old_value: vec![(); new_log.old_value.len()],
250 epochs: new_log.epochs.clone(),
251 }
252 })
253 .unwrap(),
254 },
255 )
256 })
257 .collect(),
258 }
259 }
260}