risingwave_hummock_sdk/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::PbSstableInfo;
19use risingwave_pb::hummock::hummock_version::PbLevels;
20use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
21
22use crate::compaction_group::StateTableId;
23use crate::level::{Level, Levels, LevelsCommon};
24use crate::sstable_info::SstableInfo;
25use crate::version::{
26    GroupDelta, GroupDeltas, GroupDeltasCommon, HummockVersion, HummockVersionCommon,
27    HummockVersionDelta, HummockVersionDeltaCommon, ObjectIdReader, SstableIdReader,
28};
29use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};
30
31pub type IncompleteHummockVersion = HummockVersionCommon<SstableIdInVersion>;
32
33/// Populates `SstableInfo` for `table_id`.
34/// `SstableInfo` not associated with `table_id` is removed.
35pub fn refill_version(
36    version: &mut HummockVersion,
37    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
38    table_id: TableId,
39) {
40    for level in version.levels.values_mut().flat_map(|level| {
41        level
42            .l0
43            .sub_levels
44            .iter_mut()
45            .rev()
46            .chain(level.levels.iter_mut())
47    }) {
48        refill_level(level, sst_id_to_info);
49        level
50            .table_infos
51            .retain(|t| t.table_ids.contains(&table_id));
52    }
53}
54
55fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
56    for s in &mut level.table_infos {
57        refill_sstable_info(s, sst_id_to_info);
58    }
59}
60
61/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
62fn refill_sstable_info(
63    sstable_info: &mut SstableInfo,
64    sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
65) {
66    *sstable_info = sst_id_to_info
67        .get(&sstable_info.sst_id)
68        .unwrap_or_else(|| panic!("SstableInfo should exist"))
69        .clone();
70}
71
72/// `SStableInfo` will be stripped.
73impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersion {
74    fn from(p: (&HummockVersion, &HashSet<StateTableId>)) -> Self {
75        let (version, time_travel_table_ids) = p;
76        #[expect(deprecated)]
77        Self {
78            id: version.id,
79            levels: version
80                .levels
81                .iter()
82                .map(|(group_id, levels)| {
83                    let levels = rewrite_levels(levels, time_travel_table_ids);
84                    (*group_id as CompactionGroupId, levels)
85                })
86                .collect(),
87            max_committed_epoch: version.max_committed_epoch,
88            table_watermarks: version.table_watermarks.clone(),
89            // time travel metadata doesn't include table change log
90            table_change_log: HashMap::default(),
91            state_table_info: version.state_table_info.clone(),
92            vector_indexes: version.vector_indexes.clone(),
93        }
94    }
95}
96
97/// Removes SST refs that don't contain any of `time_travel_table_ids`.
98fn rewrite_levels(
99    levels: &Levels,
100    time_travel_table_ids: &HashSet<StateTableId>,
101) -> LevelsCommon<SstableIdInVersion> {
102    fn rewrite_level(level: &mut Level, time_travel_table_ids: &HashSet<StateTableId>) {
103        // The stats like `total_file_size` are not updated accordingly since they won't be used in time travel query.
104        level.table_infos.retain(|sst| {
105            sst.table_ids
106                .iter()
107                .any(|tid| time_travel_table_ids.contains(tid))
108        });
109    }
110    let mut levels = levels.clone();
111    for level in &mut levels.levels {
112        rewrite_level(level, time_travel_table_ids);
113    }
114    {
115        let l0 = &mut levels.l0;
116        for sub_level in &mut l0.sub_levels {
117            rewrite_level(sub_level, time_travel_table_ids);
118        }
119        l0.sub_levels.retain(|s| !s.table_infos.is_empty());
120    }
121    PbLevels::from(levels).into()
122}
123
124/// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields:
125/// - `PbGroupDeltas`
126/// - `ChangeLogDelta`
127pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon<SstableIdInVersion>;
128
129/// `SStableInfo` will be stripped.
130impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockVersionDelta {
131    fn from(p: (&HummockVersionDelta, &HashSet<StateTableId>)) -> Self {
132        let (delta, time_travel_table_ids) = p;
133        #[expect(deprecated)]
134        Self {
135            id: delta.id,
136            prev_id: delta.prev_id,
137            group_deltas: delta
138                .group_deltas
139                .iter()
140                .map(|(cg_id, deltas)| {
141                    let deltas = rewrite_group_deltas(deltas, time_travel_table_ids);
142                    (*cg_id, deltas)
143                })
144                .collect(),
145            max_committed_epoch: delta.max_committed_epoch,
146            trivial_move: delta.trivial_move,
147            new_table_watermarks: delta.new_table_watermarks.clone(),
148            removed_table_ids: delta.removed_table_ids.clone(),
149            change_log_delta: delta
150                .change_log_delta
151                .iter()
152                .filter_map(|(table_id, log_delta)| {
153                    if !time_travel_table_ids.contains(table_id) {
154                        return None;
155                    }
156                    debug_assert!(
157                        log_delta
158                            .new_log
159                            .new_value
160                            .iter()
161                            .chain(log_delta.new_log.old_value.iter())
162                            .all(|s| {
163                                s.table_ids
164                                    .iter()
165                                    .any(|tid| time_travel_table_ids.contains(tid))
166                            })
167                    );
168
169                    Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
170                })
171                .collect(),
172            state_table_info_delta: delta.state_table_info_delta.clone(),
173            vector_index_delta: delta.vector_index_delta.clone(),
174        }
175    }
176}
177
178/// Removes SST refs that don't contain any of `time_travel_table_ids`.
179fn rewrite_group_deltas(
180    group_deltas: &GroupDeltas,
181    time_travel_table_ids: &HashSet<StateTableId>,
182) -> GroupDeltasCommon<SstableIdInVersion> {
183    let mut group_deltas = group_deltas.clone();
184    for group_delta in &mut group_deltas.group_deltas {
185        let GroupDelta::NewL0SubLevel(inserted_table_infos) = group_delta else {
186            tracing::error!(?group_delta, "unexpected delta type");
187            continue;
188        };
189        inserted_table_infos.retain(|sst| {
190            sst.table_ids
191                .iter()
192                .any(|tid| time_travel_table_ids.contains(tid))
193        });
194    }
195    PbGroupDeltas::from(group_deltas).into()
196}
197
198pub struct SstableIdInVersion {
199    sst_id: HummockSstableId,
200    object_id: HummockSstableObjectId,
201}
202
203impl SstableIdReader for SstableIdInVersion {
204    fn sst_id(&self) -> HummockSstableId {
205        self.sst_id
206    }
207}
208
209impl ObjectIdReader for SstableIdInVersion {
210    fn object_id(&self) -> HummockSstableObjectId {
211        self.object_id
212    }
213}
214
215impl From<&SstableIdInVersion> for PbSstableInfo {
216    fn from(sst_id: &SstableIdInVersion) -> Self {
217        Self {
218            sst_id: sst_id.sst_id.inner(),
219            object_id: sst_id.object_id.inner(),
220            ..Default::default()
221        }
222    }
223}
224
225impl From<SstableIdInVersion> for PbSstableInfo {
226    fn from(sst_id: SstableIdInVersion) -> Self {
227        (&sst_id).into()
228    }
229}
230
231impl From<&PbSstableInfo> for SstableIdInVersion {
232    fn from(s: &PbSstableInfo) -> Self {
233        SstableIdInVersion {
234            sst_id: s.sst_id.into(),
235            object_id: s.object_id.into(),
236        }
237    }
238}
239
240impl From<PbSstableInfo> for SstableIdInVersion {
241    fn from(value: PbSstableInfo) -> Self {
242        (&value).into()
243    }
244}