risingwave_hummock_sdk/
time_travel.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::PbSstableInfo;
19use risingwave_pb::hummock::hummock_version::PbLevels;
20use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
21
22use crate::compaction_group::StateTableId;
23use crate::level::{Level, Levels, LevelsCommon};
24use crate::sstable_info::SstableInfo;
25use crate::version::{
26 GroupDelta, GroupDeltas, GroupDeltasCommon, HummockVersion, HummockVersionCommon,
27 HummockVersionDelta, HummockVersionDeltaCommon, ObjectIdReader, SstableIdReader,
28};
29use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
30
31pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
32
33pub fn refill_version(
36 version: &mut HummockVersion,
37 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
38 table_id: TableId,
39) {
40 for level in version.levels.values_mut().flat_map(|level| {
41 level
42 .l0
43 .sub_levels
44 .iter_mut()
45 .rev()
46 .chain(level.levels.iter_mut())
47 }) {
48 refill_level(level, sst_id_to_info);
49 level
50 .table_infos
51 .retain(|t| t.table_ids.contains(&table_id));
52 }
53}
54
55fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
56 for s in &mut level.table_infos {
57 refill_sstable_info(s, sst_id_to_info);
58 }
59}
60
61fn refill_sstable_info(
63 sstable_info: &mut SstableInfo,
64 sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
65) {
66 *sstable_info = sst_id_to_info
67 .get(&sstable_info.sst_id)
68 .unwrap_or_else(|| panic!("SstableInfo should exist"))
69 .clone();
70}
71
72impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
74 fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
75 let (version, time_travel_table_ids) = p;
76 #[expect(deprecated)]
77 Self {
78 id: version.id,
79 levels: version
80 .levels
81 .iter()
82 .map(|(group_id, levels)| {
83 let levels = rewrite_levels(levels, time_travel_table_ids);
84 (*group_id as CompactionGroupId, levels)
85 })
86 .collect(),
87 max_committed_epoch: version.max_committed_epoch,
88 table_watermarks: version.table_watermarks.clone(),
89 table_change_log: HashMap::default(),
91 state_table_info: version.state_table_info.clone(),
92 vector_indexes: version.vector_indexes.clone(),
93 }
94 }
95}
96
97fn rewrite_levels(
99 levels: &Levels,
100 time_travel_table_ids: &HashSet<StateTableId>,
101) -> LevelsCommon<SstableIdInVersion> {
102 fn rewrite_level(level: &mut Level, time_travel_table_ids: &HashSet<StateTableId>) {
103 level.table_infos.retain(|sst| {
105 sst.table_ids
106 .iter()
107 .any(|tid| time_travel_table_ids.contains(tid))
108 });
109 }
110 let mut levels = levels.clone();
111 for level in &mut levels.levels {
112 rewrite_level(level, time_travel_table_ids);
113 }
114 {
115 let l0 = &mut levels.l0;
116 for sub_level in &mut l0.sub_levels {
117 rewrite_level(sub_level, time_travel_table_ids);
118 }
119 l0.sub_levels.retain(|s| !s.table_infos.is_empty());
120 }
121 PbLevels::from(levels).into()
122}
123
124pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
128
129impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
131 fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
132 let (delta, time_travel_table_ids) = p;
133 #[expect(deprecated)]
134 Self {
135 id: delta.id,
136 prev_id: delta.prev_id,
137 group_deltas: delta
138 .group_deltas
139 .iter()
140 .map(|(cg_id, deltas)| {
141 let deltas = rewrite_group_deltas(deltas, time_travel_table_ids);
142 (*cg_id, deltas)
143 })
144 .collect(),
145 max_committed_epoch: delta.max_committed_epoch,
146 trivial_move: delta.trivial_move,
147 new_table_watermarks: delta.new_table_watermarks.clone(),
148 removed_table_ids: delta.removed_table_ids.clone(),
149 change_log_delta: delta
150 .change_log_delta
151 .iter()
152 .filter_map(|(table_id, log_delta)| {
153 if !time_travel_table_ids.contains(table_id) {
154 return None;
155 }
156 debug_assert!(
157 log_delta
158 .new_log
159 .new_value
160 .iter()
161 .chain(log_delta.new_log.old_value.iter())
162 .all(|s| {
163 s.table_ids
164 .iter()
165 .any(|tid| time_travel_table_ids.contains(tid))
166 })
167 );
168
169 Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
170 })
171 .collect(),
172 state_table_info_delta: delta.state_table_info_delta.clone(),
173 vector_index_delta: delta.vector_index_delta.clone(),
174 }
175 }
176}
177
178fn rewrite_group_deltas(
180 group_deltas: &GroupDeltas,
181 time_travel_table_ids: &HashSet<StateTableId>,
182) -> GroupDeltasCommon<SstableIdInVersion> {
183 let mut group_deltas = group_deltas.clone();
184 for group_delta in &mut group_deltas.group_deltas {
185 let GroupDelta::NewL0SubLevel(inserted_table_infos) = group_delta else {
186 tracing::error!(?group_delta, "unexpected delta type");
187 continue;
188 };
189 inserted_table_infos.retain(|sst| {
190 sst.table_ids
191 .iter()
192 .any(|tid| time_travel_table_ids.contains(tid))
193 });
194 }
195 PbGroupDeltas::from(group_deltas).into()
196}
197
198pub struct SstableIdInVersion {
199 sst_id: HummockSstableId,
200 object_id: HummockSstableObjectId,
201}
202
203impl SstableIdReader for SstableIdInVersion {
204 fn sst_id(&self) -> HummockSstableId {
205 self.sst_id
206 }
207}
208
209impl ObjectIdReader for SstableIdInVersion {
210 fn object_id(&self) -> HummockSstableObjectId {
211 self.object_id
212 }
213}
214
215impl From<&SstableIdInVersion> for PbSstableInfo {
216 fn from(sst_id: &SstableIdInVersion) -> Self {
217 Self {
218 sst_id: sst_id.sst_id.inner(),
219 object_id: sst_id.object_id.inner(),
220 ..Default::default()
221 }
222 }
223}
224
225impl From<SstableIdInVersion> for PbSstableInfo {
226 fn from(sst_id: SstableIdInVersion) -> Self {
227 (&sst_id).into()
228 }
229}
230
231impl From<&PbSstableInfo> for SstableIdInVersion {
232 fn from(s: &PbSstableInfo) -> Self {
233 SstableIdInVersion {
234 sst_id: s.sst_id.into(),
235 object_id: s.object_id.into(),
236 }
237 }
238}
239
240impl From<PbSstableInfo> for SstableIdInVersion {
241 fn from(value: PbSstableInfo) -> Self {
242 (&value).into()
243 }
244}