Skip to main content

risingwave_storage/hummock/local_version/
recent_versions.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::local_version::pinned_version::PinnedVersion;
22use crate::monitor::HummockStateStoreMetrics;
23
24pub struct RecentVersions {
25    latest_version: PinnedVersion,
26    is_latest_committed: bool,
27    recent_versions: Vec<PinnedVersion>, // earlier version at the front
28    max_version_num: usize,
29    metric: Arc<HummockStateStoreMetrics>,
30}
31
32impl RecentVersions {
33    pub fn new(
34        version: PinnedVersion,
35        max_version_num: usize,
36        metric: Arc<HummockStateStoreMetrics>,
37    ) -> Self {
38        assert!(max_version_num > 0);
39        Self {
40            latest_version: version,
41            is_latest_committed: true, // The first version is always treated as committed epochs
42            recent_versions: Vec::new(),
43            max_version_num,
44            metric,
45        }
46    }
47
48    fn has_table_committed(&self, new_version: &PinnedVersion) -> bool {
49        let mut has_table_committed = false;
50        for (table_id, info) in new_version.state_table_info.info() {
51            if let Some(prev_info) = self.latest_version.state_table_info.info().get(table_id) {
52                match info.committed_epoch.cmp(&prev_info.committed_epoch) {
53                    Ordering::Less => {
54                        unreachable!(
55                            "table {} has regress committed epoch {}, prev committed epoch {}",
56                            table_id, info.committed_epoch, prev_info.committed_epoch
57                        );
58                    }
59                    Ordering::Equal => {}
60                    Ordering::Greater => {
61                        has_table_committed = true;
62                    }
63                }
64            } else {
65                has_table_committed = true;
66            }
67        }
68        has_table_committed
69    }
70
71    #[must_use]
72    pub fn with_new_version(&self, version: PinnedVersion) -> Self {
73        assert!(version.id > self.latest_version.id);
74        let is_committed = self.has_table_committed(&version);
75        let recent_versions = if self.is_latest_committed {
76            let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num {
77                assert_eq!(self.recent_versions.len(), self.max_version_num);
78                &self.recent_versions[1..]
79            } else {
80                &self.recent_versions[..]
81            };
82            let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1);
83            recent_versions.extend(prev_recent_versions.iter().cloned());
84            recent_versions.push(self.latest_version.clone());
85            recent_versions
86        } else {
87            self.recent_versions.clone()
88        };
89        Self {
90            latest_version: version,
91            is_latest_committed: is_committed,
92            recent_versions,
93            max_version_num: self.max_version_num,
94            metric: self.metric.clone(),
95        }
96    }
97
98    pub fn latest_version(&self) -> &PinnedVersion {
99        &self.latest_version
100    }
101
102    /// Return the latest version that is safe to read `epoch` on `table_id`.
103    ///
104    /// `safe to read` means that the `committed_epoch` of the `table_id` in the version won't be greater than the given `epoch`
105    pub fn get_safe_version(
106        &self,
107        table_id: TableId,
108        epoch: HummockEpoch,
109    ) -> Option<PinnedVersion> {
110        let result = if let Some(info) = self.latest_version.state_table_info.info().get(&table_id)
111        {
112            if info.committed_epoch <= epoch {
113                Some(self.latest_version.clone())
114            } else {
115                self.get_safe_version_from_recent(table_id, epoch)
116            }
117        } else {
118            None
119        };
120        if result.is_some() {
121            self.metric.safe_version_hit.inc();
122        } else {
123            self.metric.safe_version_miss.inc();
124        }
125        result
126    }
127
128    fn get_safe_version_from_recent(
129        &self,
130        table_id: TableId,
131        epoch: HummockEpoch,
132    ) -> Option<PinnedVersion> {
133        if cfg!(debug_assertions) {
134            assert!(
135                epoch
136                    < self
137                        .latest_version
138                        .state_table_info
139                        .info()
140                        .get(&table_id)
141                        .expect("should exist")
142                        .committed_epoch
143            );
144        }
145        let result = self.recent_versions.binary_search_by(|version| {
146            let committed_epoch = version.table_committed_epoch(table_id);
147            if let Some(committed_epoch) = committed_epoch {
148                committed_epoch.cmp(&epoch)
149            } else {
150                // We have ensured that the table_id exists in the latest version, so if the table_id does not exist in a
151                // previous version, the table must have not created yet, and therefore has  less ordering.
152                Ordering::Less
153            }
154        });
155        match result {
156            Ok(index) => Some(self.recent_versions[index].clone()),
157            Err(index) => {
158                // `index` is index of the first version that has `committed_epoch` greater than `epoch`
159                // or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch`
160                let version = if index >= self.recent_versions.len() {
161                    assert_eq!(index, self.recent_versions.len());
162                    self.recent_versions.last().cloned()
163                } else if index == 0 {
164                    // The earliest version has a higher committed epoch
165                    None
166                } else {
167                    self.recent_versions.get(index - 1).cloned()
168                };
169                // if the table does not exist in the version, return `None` to try get a time travel version
170                version.filter(|version| version.state_table_info.info().contains_key(&table_id))
171            }
172        }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::collections::HashMap;
179
180    use risingwave_common::catalog::TableId;
181    use risingwave_hummock_sdk::version::HummockVersion;
182    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
183    use tokio::sync::mpsc::unbounded_channel;
184
185    use crate::hummock::local_version::pinned_version::PinnedVersion;
186    use crate::hummock::local_version::recent_versions::RecentVersions;
187    use crate::monitor::HummockStateStoreMetrics;
188
189    const TEST_TABLE_ID1: TableId = TableId::new(233);
190    const TEST_TABLE_ID2: TableId = TableId::new(234);
191
192    fn gen_pin_version(
193        version_id: u64,
194        table_committed_epoch: impl IntoIterator<Item = (TableId, u64)>,
195    ) -> PinnedVersion {
196        PinnedVersion::new(
197            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
198                id: version_id.into(),
199                state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map(
200                    |(table_id, committed_epoch)| {
201                        (
202                            table_id,
203                            StateTableInfo {
204                                committed_epoch,
205                                compaction_group_id: 0.into(),
206                            },
207                        )
208                    },
209                )),
210                ..Default::default()
211            }),
212            unbounded_channel().0,
213        )
214    }
215
216    fn assert_query_equal(
217        recent_version: &RecentVersions,
218        expected: &[(TableId, u64, Option<&PinnedVersion>)],
219    ) {
220        for (table_id, epoch, expected_version) in expected
221            .iter()
222            .cloned()
223            .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)])
224        {
225            let version = recent_version.get_safe_version(table_id, epoch);
226            assert_eq!(
227                version.as_ref().map(|version| version.id()),
228                expected_version.map(|version| version.id())
229            );
230        }
231    }
232
233    #[test]
234    fn test_basic() {
235        let epoch1 = 233;
236        let epoch0 = epoch1 - 1;
237        let epoch2 = epoch1 + 1;
238        let epoch3 = epoch2 + 1;
239        let epoch4 = epoch3 + 1;
240        let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
241        // with at most 2 historical versions
242        let recent_versions = RecentVersions::new(
243            version1.clone(),
244            2,
245            HummockStateStoreMetrics::unused().into(),
246        );
247        assert!(recent_versions.recent_versions.is_empty());
248        assert!(recent_versions.is_latest_committed);
249        assert_query_equal(
250            &recent_versions,
251            &[
252                (TEST_TABLE_ID1, epoch0, None),
253                (TEST_TABLE_ID1, epoch1, Some(&version1)),
254                (TEST_TABLE_ID1, epoch2, Some(&version1)),
255            ],
256        );
257
258        let recent_versions =
259            recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)]));
260        assert_eq!(recent_versions.recent_versions.len(), 1);
261        assert!(!recent_versions.is_latest_committed);
262
263        let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]);
264        let recent_versions = recent_versions.with_new_version(version3.clone());
265        assert_eq!(recent_versions.recent_versions.len(), 1);
266        assert!(recent_versions.is_latest_committed);
267        assert_query_equal(
268            &recent_versions,
269            &[
270                (TEST_TABLE_ID1, epoch0, None),
271                (TEST_TABLE_ID1, epoch1, Some(&version1)),
272                (TEST_TABLE_ID1, epoch2, Some(&version3)),
273                (TEST_TABLE_ID1, epoch3, Some(&version3)),
274            ],
275        );
276
277        let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]);
278        let recent_versions = recent_versions.with_new_version(version4.clone());
279        assert_eq!(recent_versions.recent_versions.len(), 2);
280        assert!(recent_versions.is_latest_committed);
281        assert_query_equal(
282            &recent_versions,
283            &[
284                (TEST_TABLE_ID1, epoch0, None),
285                (TEST_TABLE_ID1, epoch1, Some(&version1)),
286                (TEST_TABLE_ID1, epoch2, Some(&version4)),
287                (TEST_TABLE_ID1, epoch3, Some(&version4)),
288                (TEST_TABLE_ID2, epoch0, None),
289                (TEST_TABLE_ID2, epoch1, Some(&version4)),
290                (TEST_TABLE_ID2, epoch2, Some(&version4)),
291            ],
292        );
293
294        let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]);
295        let recent_versions = recent_versions.with_new_version(version5.clone());
296        assert_eq!(recent_versions.recent_versions.len(), 2);
297        assert!(recent_versions.is_latest_committed);
298        assert_query_equal(
299            &recent_versions,
300            &[
301                (TEST_TABLE_ID1, epoch0, None),
302                (TEST_TABLE_ID1, epoch1, None),
303                (TEST_TABLE_ID1, epoch2, Some(&version4)),
304                (TEST_TABLE_ID1, epoch3, Some(&version5)),
305                (TEST_TABLE_ID1, epoch4, Some(&version5)),
306                (TEST_TABLE_ID2, epoch0, None),
307                (TEST_TABLE_ID2, epoch1, Some(&version5)),
308                (TEST_TABLE_ID2, epoch2, Some(&version5)),
309            ],
310        );
311    }
312}