risingwave_hummock_sdk/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_pb::hummock::hummock_version::PbLevels;
18use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
19use risingwave_pb::hummock::{PbLevel, PbSstableInfo, group_delta};
20
21use crate::compaction_group::StateTableId;
22use crate::level::Level;
23use crate::sstable_info::SstableInfo;
24use crate::version::{
25    HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon,
26    ObjectIdReader, SstableIdReader,
27};
28use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
29
30pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
31
32/// Populates `SstableInfo` for `table_id`.
33/// `SstableInfo` not associated with `table_id` is removed.
34pub fn refill_version(
35    version: &mut HummockVersion,
36    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
37    table_id: u32,
38) {
39    for level in version.levels.values_mut().flat_map(|level| {
40        level
41            .l0
42            .sub_levels
43            .iter_mut()
44            .rev()
45            .chain(level.levels.iter_mut())
46    }) {
47        refill_level(level, sst_id_to_info);
48        level
49            .table_infos
50            .retain(|t| t.table_ids.contains(&table_id));
51    }
52}
53
54fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
55    for s in &mut level.table_infos {
56        refill_sstable_info(s, sst_id_to_info);
57    }
58}
59
60/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
61fn refill_sstable_info(
62    sstable_info: &mut SstableInfo,
63    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
64) {
65    *sstable_info = sst_id_to_info
66        .get(&sstable_info.sst_id)
67        .unwrap_or_else(|| panic!("SstableInfo should exist"))
68        .clone();
69}
70
71/// `SStableInfo` will be stripped.
72impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
73    fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
74        let (version, time_travel_table_ids) = p;
75        #[expect(deprecated)]
76        Self {
77            id: version.id,
78            levels: version
79                .levels
80                .iter()
81                .map(|(group_id, levels)| {
82                    let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids);
83                    (*group_id as CompactionGroupId, pblevels.into())
84                })
85                .collect(),
86            max_committed_epoch: version.max_committed_epoch,
87            table_watermarks: version.table_watermarks.clone(),
88            // time travel metadata doesn't include table change log
89            table_change_log: HashMap::default(),
90            state_table_info: version.state_table_info.clone(),
91        }
92    }
93}
94
95/// Removes SST refs that don't contain any of `time_travel_table_ids`.
96fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet<StateTableId>) -> PbLevels {
97    fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet<StateTableId>) {
98        // The stats like `total_file_size` are not updated accordingly since they won't be used in time travel query.
99        level.table_infos.retain(|sst| {
100            sst.table_ids
101                .iter()
102                .any(|tid| time_travel_table_ids.contains(tid))
103        });
104    }
105    for level in &mut levels.levels {
106        rewrite_level(level, time_travel_table_ids);
107    }
108    if let Some(l0) = levels.l0.as_mut() {
109        for sub_level in &mut l0.sub_levels {
110            rewrite_level(sub_level, time_travel_table_ids);
111        }
112        l0.sub_levels.retain(|s| !s.table_infos.is_empty());
113    }
114    levels
115}
116
117/// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields:
118/// - `PbGroupDeltas`
119/// - `ChangeLogDelta`
120pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
121
122/// `SStableInfo` will be stripped.
123impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
124    fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
125        let (delta, time_travel_table_ids) = p;
126        #[expect(deprecated)]
127        Self {
128            id: delta.id,
129            prev_id: delta.prev_id,
130            group_deltas: delta
131                .group_deltas
132                .iter()
133                .map(|(cg_id, deltas)| {
134                    let pb_group_deltas =
135                        rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids);
136                    (*cg_id, pb_group_deltas.into())
137                })
138                .collect(),
139            max_committed_epoch: delta.max_committed_epoch,
140            trivial_move: delta.trivial_move,
141            new_table_watermarks: delta.new_table_watermarks.clone(),
142            removed_table_ids: delta.removed_table_ids.clone(),
143            change_log_delta: delta
144                .change_log_delta
145                .iter()
146                .filter_map(|(table_id, log_delta)| {
147                    if !time_travel_table_ids.contains(&table_id.table_id()) {
148                        return None;
149                    }
150                    debug_assert!(
151                        log_delta
152                            .new_log
153                            .new_value
154                            .iter()
155                            .chain(log_delta.new_log.old_value.iter())
156                            .all(|s| {
157                                s.table_ids
158                                    .iter()
159                                    .any(|tid| time_travel_table_ids.contains(tid))
160                            })
161                    );
162
163                    Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
164                })
165                .collect(),
166            state_table_info_delta: delta.state_table_info_delta.clone(),
167        }
168    }
169}
170
171/// Removes SST refs that don't contain any of `time_travel_table_ids`.
172fn rewrite_group_deltas(
173    mut group_deltas: PbGroupDeltas,
174    time_travel_table_ids: &HashSet<StateTableId>,
175) -> PbGroupDeltas {
176    for group_delta in &mut group_deltas.group_deltas {
177        let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) =
178            &mut group_delta.delta_type
179        else {
180            tracing::error!(?group_delta, "unexpected delta type");
181            continue;
182        };
183        new_sub_level.inserted_table_infos.retain(|sst| {
184            sst.table_ids
185                .iter()
186                .any(|tid| time_travel_table_ids.contains(tid))
187        });
188    }
189    group_deltas
190}
191
192pub struct SstableIdInVersion {
193    sst_id: HummockSstableId,
194    object_id: HummockSstableObjectId,
195}
196
197impl SstableIdReader for SstableIdInVersion {
198    fn sst_id(&self) -> HummockSstableId {
199        self.sst_id
200    }
201}
202
203impl ObjectIdReader for SstableIdInVersion {
204    fn object_id(&self) -> HummockSstableObjectId {
205        self.object_id
206    }
207}
208
209impl From<&SstableIdInVersion> for PbSstableInfo {
210    fn from(sst_id: &SstableIdInVersion) -> Self {
211        Self {
212            sst_id: sst_id.sst_id,
213            object_id: sst_id.object_id,
214            ..Default::default()
215        }
216    }
217}
218
219impl From<SstableIdInVersion> for PbSstableInfo {
220    fn from(sst_id: SstableIdInVersion) -> Self {
221        (&sst_id).into()
222    }
223}
224
225impl From<&PbSstableInfo> for SstableIdInVersion {
226    fn from(s: &PbSstableInfo) -> Self {
227        SstableIdInVersion {
228            sst_id: s.sst_id,
229            object_id: s.object_id,
230        }
231    }
232}
233
234impl From<PbSstableInfo> for SstableIdInVersion {
235    fn from(value: PbSstableInfo) -> Self {
236        (&value).into()
237    }
238}