risingwave_hummock_sdk/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_pb::hummock::hummock_version::PbLevels;
18use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
19use risingwave_pb::hummock::{PbLevel, PbSstableInfo, group_delta};
20
21use crate::compaction_group::StateTableId;
22use crate::level::Level;
23use crate::sstable_info::SstableInfo;
24use crate::version::{
25    HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon,
26    ObjectIdReader, SstableIdReader,
27};
28use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
29
30pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
31
32/// Populates `SstableInfo` for `table_id`.
33/// `SstableInfo` not associated with `table_id` is removed.
34pub fn refill_version(
35    version: &mut HummockVersion,
36    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
37    table_id: u32,
38) {
39    for level in version.levels.values_mut().flat_map(|level| {
40        level
41            .l0
42            .sub_levels
43            .iter_mut()
44            .rev()
45            .chain(level.levels.iter_mut())
46    }) {
47        refill_level(level, sst_id_to_info);
48        level
49            .table_infos
50            .retain(|t| t.table_ids.contains(&table_id));
51    }
52}
53
54fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
55    for s in &mut level.table_infos {
56        refill_sstable_info(s, sst_id_to_info);
57    }
58}
59
60/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
61fn refill_sstable_info(
62    sstable_info: &mut SstableInfo,
63    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
64) {
65    *sstable_info = sst_id_to_info
66        .get(&sstable_info.sst_id)
67        .unwrap_or_else(|| panic!("SstableInfo should exist"))
68        .clone();
69}
70
71/// `SStableInfo` will be stripped.
72impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
73    fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
74        let (version, time_travel_table_ids) = p;
75        #[expect(deprecated)]
76        Self {
77            id: version.id,
78            levels: version
79                .levels
80                .iter()
81                .map(|(group_id, levels)| {
82                    let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids);
83                    (*group_id as CompactionGroupId, pblevels.into())
84                })
85                .collect(),
86            max_committed_epoch: version.max_committed_epoch,
87            table_watermarks: version.table_watermarks.clone(),
88            // time travel metadata doesn't include table change log
89            table_change_log: HashMap::default(),
90            state_table_info: version.state_table_info.clone(),
91            vector_indexes: version.vector_indexes.clone(),
92        }
93    }
94}
95
96/// Removes SST refs that don't contain any of `time_travel_table_ids`.
97fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet<StateTableId>) -> PbLevels {
98    fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet<StateTableId>) {
99        // The stats like `total_file_size` are not updated accordingly since they won't be used in time travel query.
100        level.table_infos.retain(|sst| {
101            sst.table_ids
102                .iter()
103                .any(|tid| time_travel_table_ids.contains(tid))
104        });
105    }
106    for level in &mut levels.levels {
107        rewrite_level(level, time_travel_table_ids);
108    }
109    if let Some(l0) = levels.l0.as_mut() {
110        for sub_level in &mut l0.sub_levels {
111            rewrite_level(sub_level, time_travel_table_ids);
112        }
113        l0.sub_levels.retain(|s| !s.table_infos.is_empty());
114    }
115    levels
116}
117
118/// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields:
119/// - `PbGroupDeltas`
120/// - `ChangeLogDelta`
121pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
122
123/// `SStableInfo` will be stripped.
124impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
125    fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
126        let (delta, time_travel_table_ids) = p;
127        #[expect(deprecated)]
128        Self {
129            id: delta.id,
130            prev_id: delta.prev_id,
131            group_deltas: delta
132                .group_deltas
133                .iter()
134                .map(|(cg_id, deltas)| {
135                    let pb_group_deltas =
136                        rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids);
137                    (*cg_id, pb_group_deltas.into())
138                })
139                .collect(),
140            max_committed_epoch: delta.max_committed_epoch,
141            trivial_move: delta.trivial_move,
142            new_table_watermarks: delta.new_table_watermarks.clone(),
143            removed_table_ids: delta.removed_table_ids.clone(),
144            change_log_delta: delta
145                .change_log_delta
146                .iter()
147                .filter_map(|(table_id, log_delta)| {
148                    if !time_travel_table_ids.contains(&table_id.table_id()) {
149                        return None;
150                    }
151                    debug_assert!(
152                        log_delta
153                            .new_log
154                            .new_value
155                            .iter()
156                            .chain(log_delta.new_log.old_value.iter())
157                            .all(|s| {
158                                s.table_ids
159                                    .iter()
160                                    .any(|tid| time_travel_table_ids.contains(tid))
161                            })
162                    );
163
164                    Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
165                })
166                .collect(),
167            state_table_info_delta: delta.state_table_info_delta.clone(),
168            vector_index_delta: delta.vector_index_delta.clone(),
169        }
170    }
171}
172
173/// Removes SST refs that don't contain any of `time_travel_table_ids`.
174fn rewrite_group_deltas(
175    mut group_deltas: PbGroupDeltas,
176    time_travel_table_ids: &HashSet<StateTableId>,
177) -> PbGroupDeltas {
178    for group_delta in &mut group_deltas.group_deltas {
179        let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) =
180            &mut group_delta.delta_type
181        else {
182            tracing::error!(?group_delta, "unexpected delta type");
183            continue;
184        };
185        new_sub_level.inserted_table_infos.retain(|sst| {
186            sst.table_ids
187                .iter()
188                .any(|tid| time_travel_table_ids.contains(tid))
189        });
190    }
191    group_deltas
192}
193
194pub struct SstableIdInVersion {
195    sst_id: HummockSstableId,
196    object_id: HummockSstableObjectId,
197}
198
199impl SstableIdReader for SstableIdInVersion {
200    fn sst_id(&self) -> HummockSstableId {
201        self.sst_id
202    }
203}
204
205impl ObjectIdReader for SstableIdInVersion {
206    fn object_id(&self) -> HummockSstableObjectId {
207        self.object_id
208    }
209}
210
211impl From<&SstableIdInVersion> for PbSstableInfo {
212    fn from(sst_id: &SstableIdInVersion) -> Self {
213        Self {
214            sst_id: sst_id.sst_id.inner(),
215            object_id: sst_id.object_id.inner(),
216            ..Default::default()
217        }
218    }
219}
220
221impl From<SstableIdInVersion> for PbSstableInfo {
222    fn from(sst_id: SstableIdInVersion) -> Self {
223        (&sst_id).into()
224    }
225}
226
227impl From<&PbSstableInfo> for SstableIdInVersion {
228    fn from(s: &PbSstableInfo) -> Self {
229        SstableIdInVersion {
230            sst_id: s.sst_id.into(),
231            object_id: s.object_id.into(),
232        }
233    }
234}
235
236impl From<PbSstableInfo> for SstableIdInVersion {
237    fn from(value: PbSstableInfo) -> Self {
238        (&value).into()
239    }
240}