risingwave_storage/hummock/local_version/
recent_versions.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::local_version::pinned_version::PinnedVersion;
22use crate::monitor::HummockStateStoreMetrics;
23
24pub struct RecentVersions {
25    latest_version: PinnedVersion,
26    is_latest_committed: bool,
27    recent_versions: Vec<PinnedVersion>, // earlier version at the front
28    max_version_num: usize,
29    metric: Arc<HummockStateStoreMetrics>,
30}
31
32impl RecentVersions {
33    pub fn new(
34        version: PinnedVersion,
35        max_version_num: usize,
36        metric: Arc<HummockStateStoreMetrics>,
37    ) -> Self {
38        assert!(max_version_num > 0);
39        Self {
40            latest_version: version,
41            is_latest_committed: true, // The first version is always treated as committed epochs
42            recent_versions: Vec::new(),
43            max_version_num,
44            metric,
45        }
46    }
47
48    fn has_table_committed(&self, new_version: &PinnedVersion) -> bool {
49        let mut has_table_committed = false;
50        for (table_id, info) in new_version.state_table_info.info() {
51            if let Some(prev_info) = self.latest_version.state_table_info.info().get(table_id) {
52                match info.committed_epoch.cmp(&prev_info.committed_epoch) {
53                    Ordering::Less => {
54                        unreachable!(
55                            "table {} has regress committed epoch {}, prev committed epoch {}",
56                            table_id, info.committed_epoch, prev_info.committed_epoch
57                        );
58                    }
59                    Ordering::Equal => {}
60                    Ordering::Greater => {
61                        has_table_committed = true;
62                    }
63                }
64            } else {
65                has_table_committed = true;
66            }
67        }
68        has_table_committed
69    }
70
71    #[must_use]
72    pub fn with_new_version(&self, version: PinnedVersion) -> Self {
73        assert!(version.id > self.latest_version.id);
74        let is_committed = self.has_table_committed(&version);
75        let recent_versions = if self.is_latest_committed {
76            let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num {
77                assert_eq!(self.recent_versions.len(), self.max_version_num);
78                &self.recent_versions[1..]
79            } else {
80                &self.recent_versions[..]
81            };
82            let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1);
83            recent_versions.extend(prev_recent_versions.iter().cloned());
84            recent_versions.push(self.latest_version.clone());
85            recent_versions
86        } else {
87            self.recent_versions.clone()
88        };
89        Self {
90            latest_version: version,
91            is_latest_committed: is_committed,
92            recent_versions,
93            max_version_num: self.max_version_num,
94            metric: self.metric.clone(),
95        }
96    }
97
98    pub fn latest_version(&self) -> &PinnedVersion {
99        &self.latest_version
100    }
101
102    /// Return the latest version that is safe to read `epoch` on `table_id`.
103    ///
104    /// `safe to read` means that the `committed_epoch` of the `table_id` in the version won't be greater than the given `epoch`
105    pub fn get_safe_version(
106        &self,
107        table_id: TableId,
108        epoch: HummockEpoch,
109    ) -> Option<PinnedVersion> {
110        let result = if let Some(info) = self.latest_version.state_table_info.info().get(&table_id)
111        {
112            if info.committed_epoch <= epoch {
113                Some(self.latest_version.clone())
114            } else {
115                self.get_safe_version_from_recent(table_id, epoch)
116            }
117        } else {
118            None
119        };
120        if result.is_some() {
121            self.metric.safe_version_hit.inc();
122        } else {
123            self.metric.safe_version_miss.inc();
124        }
125        result
126    }
127
128    fn get_safe_version_from_recent(
129        &self,
130        table_id: TableId,
131        epoch: HummockEpoch,
132    ) -> Option<PinnedVersion> {
133        if cfg!(debug_assertions) {
134            assert!(
135                epoch
136                    < self
137                        .latest_version
138                        .state_table_info
139                        .info()
140                        .get(&table_id)
141                        .expect("should exist")
142                        .committed_epoch
143            );
144        }
145        let result = self.recent_versions.binary_search_by(|version| {
146            let committed_epoch = version.table_committed_epoch(table_id);
147            if let Some(committed_epoch) = committed_epoch {
148                committed_epoch.cmp(&epoch)
149            } else {
150                // We have ensured that the table_id exists in the latest version, so if the table_id does not exist in a
151                // previous version, the table must have not created yet, and therefore has  less ordering.
152                Ordering::Less
153            }
154        });
155        match result {
156            Ok(index) => Some(self.recent_versions[index].clone()),
157            Err(index) => {
158                // `index` is index of the first version that has `committed_epoch` greater than `epoch`
159                // or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch`
160                let version = if index >= self.recent_versions.len() {
161                    assert_eq!(index, self.recent_versions.len());
162                    self.recent_versions.last().cloned()
163                } else if index == 0 {
164                    // The earliest version has a higher committed epoch
165                    None
166                } else {
167                    self.recent_versions.get(index - 1).cloned()
168                };
169                version.and_then(|version| {
170                    if version.state_table_info.info().contains_key(&table_id) {
171                        Some(version)
172                    } else {
173                        // if the table does not exist in the version, return `None` to try get a time travel version
174                        None
175                    }
176                })
177            }
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use std::collections::HashMap;
185
186    use risingwave_common::catalog::TableId;
187    use risingwave_hummock_sdk::version::HummockVersion;
188    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
189    use tokio::sync::mpsc::unbounded_channel;
190
191    use crate::hummock::local_version::pinned_version::PinnedVersion;
192    use crate::hummock::local_version::recent_versions::RecentVersions;
193    use crate::monitor::HummockStateStoreMetrics;
194
195    const TEST_TABLE_ID1: TableId = TableId::new(233);
196    const TEST_TABLE_ID2: TableId = TableId::new(234);
197
198    fn gen_pin_version(
199        version_id: u64,
200        table_committed_epoch: impl IntoIterator<Item = (TableId, u64)>,
201    ) -> PinnedVersion {
202        PinnedVersion::new(
203            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
204                id: version_id,
205                state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map(
206                    |(table_id, committed_epoch)| {
207                        (
208                            table_id.table_id,
209                            StateTableInfo {
210                                committed_epoch,
211                                compaction_group_id: 0,
212                            },
213                        )
214                    },
215                )),
216                ..Default::default()
217            }),
218            unbounded_channel().0,
219        )
220    }
221
222    fn assert_query_equal(
223        recent_version: &RecentVersions,
224        expected: &[(TableId, u64, Option<&PinnedVersion>)],
225    ) {
226        for (table_id, epoch, expected_version) in expected
227            .iter()
228            .cloned()
229            .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)])
230        {
231            let version = recent_version.get_safe_version(table_id, epoch);
232            assert_eq!(
233                version.as_ref().map(|version| version.id()),
234                expected_version.map(|version| version.id())
235            );
236        }
237    }
238
239    #[test]
240    fn test_basic() {
241        let epoch1 = 233;
242        let epoch0 = epoch1 - 1;
243        let epoch2 = epoch1 + 1;
244        let epoch3 = epoch2 + 1;
245        let epoch4 = epoch3 + 1;
246        let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
247        // with at most 2 historical versions
248        let recent_versions = RecentVersions::new(
249            version1.clone(),
250            2,
251            HummockStateStoreMetrics::unused().into(),
252        );
253        assert!(recent_versions.recent_versions.is_empty());
254        assert!(recent_versions.is_latest_committed);
255        assert_query_equal(
256            &recent_versions,
257            &[
258                (TEST_TABLE_ID1, epoch0, None),
259                (TEST_TABLE_ID1, epoch1, Some(&version1)),
260                (TEST_TABLE_ID1, epoch2, Some(&version1)),
261            ],
262        );
263
264        let recent_versions =
265            recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)]));
266        assert_eq!(recent_versions.recent_versions.len(), 1);
267        assert!(!recent_versions.is_latest_committed);
268
269        let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]);
270        let recent_versions = recent_versions.with_new_version(version3.clone());
271        assert_eq!(recent_versions.recent_versions.len(), 1);
272        assert!(recent_versions.is_latest_committed);
273        assert_query_equal(
274            &recent_versions,
275            &[
276                (TEST_TABLE_ID1, epoch0, None),
277                (TEST_TABLE_ID1, epoch1, Some(&version1)),
278                (TEST_TABLE_ID1, epoch2, Some(&version3)),
279                (TEST_TABLE_ID1, epoch3, Some(&version3)),
280            ],
281        );
282
283        let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]);
284        let recent_versions = recent_versions.with_new_version(version4.clone());
285        assert_eq!(recent_versions.recent_versions.len(), 2);
286        assert!(recent_versions.is_latest_committed);
287        assert_query_equal(
288            &recent_versions,
289            &[
290                (TEST_TABLE_ID1, epoch0, None),
291                (TEST_TABLE_ID1, epoch1, Some(&version1)),
292                (TEST_TABLE_ID1, epoch2, Some(&version4)),
293                (TEST_TABLE_ID1, epoch3, Some(&version4)),
294                (TEST_TABLE_ID2, epoch0, None),
295                (TEST_TABLE_ID2, epoch1, Some(&version4)),
296                (TEST_TABLE_ID2, epoch2, Some(&version4)),
297            ],
298        );
299
300        let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]);
301        let recent_versions = recent_versions.with_new_version(version5.clone());
302        assert_eq!(recent_versions.recent_versions.len(), 2);
303        assert!(recent_versions.is_latest_committed);
304        assert_query_equal(
305            &recent_versions,
306            &[
307                (TEST_TABLE_ID1, epoch0, None),
308                (TEST_TABLE_ID1, epoch1, None),
309                (TEST_TABLE_ID1, epoch2, Some(&version4)),
310                (TEST_TABLE_ID1, epoch3, Some(&version5)),
311                (TEST_TABLE_ID1, epoch4, Some(&version5)),
312                (TEST_TABLE_ID2, epoch0, None),
313                (TEST_TABLE_ID2, epoch1, Some(&version5)),
314                (TEST_TABLE_ID2, epoch2, Some(&version5)),
315            ],
316        );
317    }
318}