risingwave_hummock_sdk/
time_travel.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_pb::hummock::hummock_version::PbLevels;
18use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
19use risingwave_pb::hummock::{PbLevel, PbSstableInfo, group_delta};
20
21use crate::compaction_group::StateTableId;
22use crate::level::Level;
23use crate::sstable_info::SstableInfo;
24use crate::version::{
25 HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon,
26 ObjectIdReader, SstableIdReader,
27};
28use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
29
30pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
31
32pub fn refill_version(
35 version: &mut HummockVersion,
36 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
37 table_id: u32,
38) {
39 for level in version.levels.values_mut().flat_map(|level| {
40 level
41 .l0
42 .sub_levels
43 .iter_mut()
44 .rev()
45 .chain(level.levels.iter_mut())
46 }) {
47 refill_level(level, sst_id_to_info);
48 level
49 .table_infos
50 .retain(|t| t.table_ids.contains(&table_id));
51 }
52}
53
54fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
55 for s in &mut level.table_infos {
56 refill_sstable_info(s, sst_id_to_info);
57 }
58}
59
60fn refill_sstable_info(
62 sstable_info: &mut SstableInfo,
63 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
64) {
65 *sstable_info = sst_id_to_info
66 .get(&sstable_info.sst_id)
67 .unwrap_or_else(|| panic!("SstableInfo should exist"))
68 .clone();
69}
70
71impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
73 fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
74 let (version, time_travel_table_ids) = p;
75 #[expect(deprecated)]
76 Self {
77 id: version.id,
78 levels: version
79 .levels
80 .iter()
81 .map(|(group_id, levels)| {
82 let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids);
83 (*group_id as CompactionGroupId, pblevels.into())
84 })
85 .collect(),
86 max_committed_epoch: version.max_committed_epoch,
87 table_watermarks: version.table_watermarks.clone(),
88 table_change_log: HashMap::default(),
90 state_table_info: version.state_table_info.clone(),
91 }
92 }
93}
94
95fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet<StateTableId>) -> PbLevels {
97 fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet<StateTableId>) {
98 level.table_infos.retain(|sst| {
100 sst.table_ids
101 .iter()
102 .any(|tid| time_travel_table_ids.contains(tid))
103 });
104 }
105 for level in &mut levels.levels {
106 rewrite_level(level, time_travel_table_ids);
107 }
108 if let Some(l0) = levels.l0.as_mut() {
109 for sub_level in &mut l0.sub_levels {
110 rewrite_level(sub_level, time_travel_table_ids);
111 }
112 l0.sub_levels.retain(|s| !s.table_infos.is_empty());
113 }
114 levels
115}
116
117pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
121
122impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
124 fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
125 let (delta, time_travel_table_ids) = p;
126 #[expect(deprecated)]
127 Self {
128 id: delta.id,
129 prev_id: delta.prev_id,
130 group_deltas: delta
131 .group_deltas
132 .iter()
133 .map(|(cg_id, deltas)| {
134 let pb_group_deltas =
135 rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids);
136 (*cg_id, pb_group_deltas.into())
137 })
138 .collect(),
139 max_committed_epoch: delta.max_committed_epoch,
140 trivial_move: delta.trivial_move,
141 new_table_watermarks: delta.new_table_watermarks.clone(),
142 removed_table_ids: delta.removed_table_ids.clone(),
143 change_log_delta: delta
144 .change_log_delta
145 .iter()
146 .filter_map(|(table_id, log_delta)| {
147 if !time_travel_table_ids.contains(&table_id.table_id()) {
148 return None;
149 }
150 debug_assert!(
151 log_delta
152 .new_log
153 .new_value
154 .iter()
155 .chain(log_delta.new_log.old_value.iter())
156 .all(|s| {
157 s.table_ids
158 .iter()
159 .any(|tid| time_travel_table_ids.contains(tid))
160 })
161 );
162
163 Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
164 })
165 .collect(),
166 state_table_info_delta: delta.state_table_info_delta.clone(),
167 }
168 }
169}
170
171fn rewrite_group_deltas(
173 mut group_deltas: PbGroupDeltas,
174 time_travel_table_ids: &HashSet<StateTableId>,
175) -> PbGroupDeltas {
176 for group_delta in &mut group_deltas.group_deltas {
177 let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) =
178 &mut group_delta.delta_type
179 else {
180 tracing::error!(?group_delta, "unexpected delta type");
181 continue;
182 };
183 new_sub_level.inserted_table_infos.retain(|sst| {
184 sst.table_ids
185 .iter()
186 .any(|tid| time_travel_table_ids.contains(tid))
187 });
188 }
189 group_deltas
190}
191
192pub struct SstableIdInVersion {
193 sst_id: HummockSstableId,
194 object_id: HummockSstableObjectId,
195}
196
197impl SstableIdReader for SstableIdInVersion {
198 fn sst_id(&self) -> HummockSstableId {
199 self.sst_id
200 }
201}
202
203impl ObjectIdReader for SstableIdInVersion {
204 fn object_id(&self) -> HummockSstableObjectId {
205 self.object_id
206 }
207}
208
209impl From<&SstableIdInVersion> for PbSstableInfo {
210 fn from(sst_id: &SstableIdInVersion) -> Self {
211 Self {
212 sst_id: sst_id.sst_id,
213 object_id: sst_id.object_id,
214 ..Default::default()
215 }
216 }
217}
218
219impl From<SstableIdInVersion> for PbSstableInfo {
220 fn from(sst_id: SstableIdInVersion) -> Self {
221 (&sst_id).into()
222 }
223}
224
225impl From<&PbSstableInfo> for SstableIdInVersion {
226 fn from(s: &PbSstableInfo) -> Self {
227 SstableIdInVersion {
228 sst_id: s.sst_id,
229 object_id: s.object_id,
230 }
231 }
232}
233
234impl From<PbSstableInfo> for SstableIdInVersion {
235 fn from(value: PbSstableInfo) -> Self {
236 (&value).into()
237 }
238}