risingwave_hummock_sdk/
time_travel.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_pb::hummock::hummock_version::PbLevels;
18use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
19use risingwave_pb::hummock::{PbLevel, PbSstableInfo, group_delta};
20
21use crate::compaction_group::StateTableId;
22use crate::level::Level;
23use crate::sstable_info::SstableInfo;
24use crate::version::{
25 HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon,
26 ObjectIdReader, SstableIdReader,
27};
28use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
29
30pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
31
32pub fn refill_version(
35 version: &mut HummockVersion,
36 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
37 table_id: u32,
38) {
39 for level in version.levels.values_mut().flat_map(|level| {
40 level
41 .l0
42 .sub_levels
43 .iter_mut()
44 .rev()
45 .chain(level.levels.iter_mut())
46 }) {
47 refill_level(level, sst_id_to_info);
48 level
49 .table_infos
50 .retain(|t| t.table_ids.contains(&table_id));
51 }
52}
53
54fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
55 for s in &mut level.table_infos {
56 refill_sstable_info(s, sst_id_to_info);
57 }
58}
59
60fn refill_sstable_info(
62 sstable_info: &mut SstableInfo,
63 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
64) {
65 *sstable_info = sst_id_to_info
66 .get(&sstable_info.sst_id)
67 .unwrap_or_else(|| panic!("SstableInfo should exist"))
68 .clone();
69}
70
71impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
73 fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
74 let (version, time_travel_table_ids) = p;
75 #[expect(deprecated)]
76 Self {
77 id: version.id,
78 levels: version
79 .levels
80 .iter()
81 .map(|(group_id, levels)| {
82 let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids);
83 (*group_id as CompactionGroupId, pblevels.into())
84 })
85 .collect(),
86 max_committed_epoch: version.max_committed_epoch,
87 table_watermarks: version.table_watermarks.clone(),
88 table_change_log: HashMap::default(),
90 state_table_info: version.state_table_info.clone(),
91 vector_indexes: version.vector_indexes.clone(),
92 }
93 }
94}
95
96fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet<StateTableId>) -> PbLevels {
98 fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet<StateTableId>) {
99 level.table_infos.retain(|sst| {
101 sst.table_ids
102 .iter()
103 .any(|tid| time_travel_table_ids.contains(tid))
104 });
105 }
106 for level in &mut levels.levels {
107 rewrite_level(level, time_travel_table_ids);
108 }
109 if let Some(l0) = levels.l0.as_mut() {
110 for sub_level in &mut l0.sub_levels {
111 rewrite_level(sub_level, time_travel_table_ids);
112 }
113 l0.sub_levels.retain(|s| !s.table_infos.is_empty());
114 }
115 levels
116}
117
118pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
122
123impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
125 fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
126 let (delta, time_travel_table_ids) = p;
127 #[expect(deprecated)]
128 Self {
129 id: delta.id,
130 prev_id: delta.prev_id,
131 group_deltas: delta
132 .group_deltas
133 .iter()
134 .map(|(cg_id, deltas)| {
135 let pb_group_deltas =
136 rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids);
137 (*cg_id, pb_group_deltas.into())
138 })
139 .collect(),
140 max_committed_epoch: delta.max_committed_epoch,
141 trivial_move: delta.trivial_move,
142 new_table_watermarks: delta.new_table_watermarks.clone(),
143 removed_table_ids: delta.removed_table_ids.clone(),
144 change_log_delta: delta
145 .change_log_delta
146 .iter()
147 .filter_map(|(table_id, log_delta)| {
148 if !time_travel_table_ids.contains(&table_id.table_id()) {
149 return None;
150 }
151 debug_assert!(
152 log_delta
153 .new_log
154 .new_value
155 .iter()
156 .chain(log_delta.new_log.old_value.iter())
157 .all(|s| {
158 s.table_ids
159 .iter()
160 .any(|tid| time_travel_table_ids.contains(tid))
161 })
162 );
163
164 Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
165 })
166 .collect(),
167 state_table_info_delta: delta.state_table_info_delta.clone(),
168 vector_index_delta: delta.vector_index_delta.clone(),
169 }
170 }
171}
172
173fn rewrite_group_deltas(
175 mut group_deltas: PbGroupDeltas,
176 time_travel_table_ids: &HashSet<StateTableId>,
177) -> PbGroupDeltas {
178 for group_delta in &mut group_deltas.group_deltas {
179 let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) =
180 &mut group_delta.delta_type
181 else {
182 tracing::error!(?group_delta, "unexpected delta type");
183 continue;
184 };
185 new_sub_level.inserted_table_infos.retain(|sst| {
186 sst.table_ids
187 .iter()
188 .any(|tid| time_travel_table_ids.contains(tid))
189 });
190 }
191 group_deltas
192}
193
194pub struct SstableIdInVersion {
195 sst_id: HummockSstableId,
196 object_id: HummockSstableObjectId,
197}
198
199impl SstableIdReader for SstableIdInVersion {
200 fn sst_id(&self) -> HummockSstableId {
201 self.sst_id
202 }
203}
204
205impl ObjectIdReader for SstableIdInVersion {
206 fn object_id(&self) -> HummockSstableObjectId {
207 self.object_id
208 }
209}
210
211impl From<&SstableIdInVersion> for PbSstableInfo {
212 fn from(sst_id: &SstableIdInVersion) -> Self {
213 Self {
214 sst_id: sst_id.sst_id.inner(),
215 object_id: sst_id.object_id.inner(),
216 ..Default::default()
217 }
218 }
219}
220
221impl From<SstableIdInVersion> for PbSstableInfo {
222 fn from(sst_id: SstableIdInVersion) -> Self {
223 (&sst_id).into()
224 }
225}
226
227impl From<&PbSstableInfo> for SstableIdInVersion {
228 fn from(s: &PbSstableInfo) -> Self {
229 SstableIdInVersion {
230 sst_id: s.sst_id.into(),
231 object_id: s.object_id.into(),
232 }
233 }
234}
235
236impl From<PbSstableInfo> for SstableIdInVersion {
237 fn from(value: PbSstableInfo) -> Self {
238 (&value).into()
239 }
240}