risingwave_storage/hummock/local_version/
recent_versions.rs1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::local_version::pinned_version::PinnedVersion;
22use crate::monitor::HummockStateStoreMetrics;
23
24pub struct RecentVersions {
25 latest_version: PinnedVersion,
26 is_latest_committed: bool,
27 recent_versions: Vec<PinnedVersion>, max_version_num: usize,
29 metric: Arc<HummockStateStoreMetrics>,
30}
31
32impl RecentVersions {
33 pub fn new(
34 version: PinnedVersion,
35 max_version_num: usize,
36 metric: Arc<HummockStateStoreMetrics>,
37 ) -> Self {
38 assert!(max_version_num > 0);
39 Self {
40 latest_version: version,
41 is_latest_committed: true, recent_versions: Vec::new(),
43 max_version_num,
44 metric,
45 }
46 }
47
48 fn has_table_committed(&self, new_version: &PinnedVersion) -> bool {
49 let mut has_table_committed = false;
50 for (table_id, info) in new_version.state_table_info.info() {
51 if let Some(prev_info) = self.latest_version.state_table_info.info().get(table_id) {
52 match info.committed_epoch.cmp(&prev_info.committed_epoch) {
53 Ordering::Less => {
54 unreachable!(
55 "table {} has regress committed epoch {}, prev committed epoch {}",
56 table_id, info.committed_epoch, prev_info.committed_epoch
57 );
58 }
59 Ordering::Equal => {}
60 Ordering::Greater => {
61 has_table_committed = true;
62 }
63 }
64 } else {
65 has_table_committed = true;
66 }
67 }
68 has_table_committed
69 }
70
71 #[must_use]
72 pub fn with_new_version(&self, version: PinnedVersion) -> Self {
73 assert!(version.id > self.latest_version.id);
74 let is_committed = self.has_table_committed(&version);
75 let recent_versions = if self.is_latest_committed {
76 let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num {
77 assert_eq!(self.recent_versions.len(), self.max_version_num);
78 &self.recent_versions[1..]
79 } else {
80 &self.recent_versions[..]
81 };
82 let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1);
83 recent_versions.extend(prev_recent_versions.iter().cloned());
84 recent_versions.push(self.latest_version.clone());
85 recent_versions
86 } else {
87 self.recent_versions.clone()
88 };
89 Self {
90 latest_version: version,
91 is_latest_committed: is_committed,
92 recent_versions,
93 max_version_num: self.max_version_num,
94 metric: self.metric.clone(),
95 }
96 }
97
98 pub fn latest_version(&self) -> &PinnedVersion {
99 &self.latest_version
100 }
101
102 pub fn get_safe_version(
106 &self,
107 table_id: TableId,
108 epoch: HummockEpoch,
109 ) -> Option<PinnedVersion> {
110 let result = if let Some(info) = self.latest_version.state_table_info.info().get(&table_id)
111 {
112 if info.committed_epoch <= epoch {
113 Some(self.latest_version.clone())
114 } else {
115 self.get_safe_version_from_recent(table_id, epoch)
116 }
117 } else {
118 None
119 };
120 if result.is_some() {
121 self.metric.safe_version_hit.inc();
122 } else {
123 self.metric.safe_version_miss.inc();
124 }
125 result
126 }
127
128 fn get_safe_version_from_recent(
129 &self,
130 table_id: TableId,
131 epoch: HummockEpoch,
132 ) -> Option<PinnedVersion> {
133 if cfg!(debug_assertions) {
134 assert!(
135 epoch
136 < self
137 .latest_version
138 .state_table_info
139 .info()
140 .get(&table_id)
141 .expect("should exist")
142 .committed_epoch
143 );
144 }
145 let result = self.recent_versions.binary_search_by(|version| {
146 let committed_epoch = version.table_committed_epoch(table_id);
147 if let Some(committed_epoch) = committed_epoch {
148 committed_epoch.cmp(&epoch)
149 } else {
150 Ordering::Less
153 }
154 });
155 match result {
156 Ok(index) => Some(self.recent_versions[index].clone()),
157 Err(index) => {
158 let version = if index >= self.recent_versions.len() {
161 assert_eq!(index, self.recent_versions.len());
162 self.recent_versions.last().cloned()
163 } else if index == 0 {
164 None
166 } else {
167 self.recent_versions.get(index - 1).cloned()
168 };
169 version.filter(|version| version.state_table_info.info().contains_key(&table_id))
171 }
172 }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::collections::HashMap;
179
180 use risingwave_common::catalog::TableId;
181 use risingwave_hummock_sdk::version::HummockVersion;
182 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
183 use tokio::sync::mpsc::unbounded_channel;
184
185 use crate::hummock::local_version::pinned_version::PinnedVersion;
186 use crate::hummock::local_version::recent_versions::RecentVersions;
187 use crate::monitor::HummockStateStoreMetrics;
188
189 const TEST_TABLE_ID1: TableId = TableId::new(233);
190 const TEST_TABLE_ID2: TableId = TableId::new(234);
191
192 fn gen_pin_version(
193 version_id: u64,
194 table_committed_epoch: impl IntoIterator<Item = (TableId, u64)>,
195 ) -> PinnedVersion {
196 PinnedVersion::new(
197 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
198 id: version_id.into(),
199 state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map(
200 |(table_id, committed_epoch)| {
201 (
202 table_id,
203 StateTableInfo {
204 committed_epoch,
205 compaction_group_id: 0.into(),
206 },
207 )
208 },
209 )),
210 ..Default::default()
211 }),
212 unbounded_channel().0,
213 )
214 }
215
216 fn assert_query_equal(
217 recent_version: &RecentVersions,
218 expected: &[(TableId, u64, Option<&PinnedVersion>)],
219 ) {
220 for (table_id, epoch, expected_version) in expected
221 .iter()
222 .cloned()
223 .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)])
224 {
225 let version = recent_version.get_safe_version(table_id, epoch);
226 assert_eq!(
227 version.as_ref().map(|version| version.id()),
228 expected_version.map(|version| version.id())
229 );
230 }
231 }
232
233 #[test]
234 fn test_basic() {
235 let epoch1 = 233;
236 let epoch0 = epoch1 - 1;
237 let epoch2 = epoch1 + 1;
238 let epoch3 = epoch2 + 1;
239 let epoch4 = epoch3 + 1;
240 let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
241 let recent_versions = RecentVersions::new(
243 version1.clone(),
244 2,
245 HummockStateStoreMetrics::unused().into(),
246 );
247 assert!(recent_versions.recent_versions.is_empty());
248 assert!(recent_versions.is_latest_committed);
249 assert_query_equal(
250 &recent_versions,
251 &[
252 (TEST_TABLE_ID1, epoch0, None),
253 (TEST_TABLE_ID1, epoch1, Some(&version1)),
254 (TEST_TABLE_ID1, epoch2, Some(&version1)),
255 ],
256 );
257
258 let recent_versions =
259 recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)]));
260 assert_eq!(recent_versions.recent_versions.len(), 1);
261 assert!(!recent_versions.is_latest_committed);
262
263 let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]);
264 let recent_versions = recent_versions.with_new_version(version3.clone());
265 assert_eq!(recent_versions.recent_versions.len(), 1);
266 assert!(recent_versions.is_latest_committed);
267 assert_query_equal(
268 &recent_versions,
269 &[
270 (TEST_TABLE_ID1, epoch0, None),
271 (TEST_TABLE_ID1, epoch1, Some(&version1)),
272 (TEST_TABLE_ID1, epoch2, Some(&version3)),
273 (TEST_TABLE_ID1, epoch3, Some(&version3)),
274 ],
275 );
276
277 let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]);
278 let recent_versions = recent_versions.with_new_version(version4.clone());
279 assert_eq!(recent_versions.recent_versions.len(), 2);
280 assert!(recent_versions.is_latest_committed);
281 assert_query_equal(
282 &recent_versions,
283 &[
284 (TEST_TABLE_ID1, epoch0, None),
285 (TEST_TABLE_ID1, epoch1, Some(&version1)),
286 (TEST_TABLE_ID1, epoch2, Some(&version4)),
287 (TEST_TABLE_ID1, epoch3, Some(&version4)),
288 (TEST_TABLE_ID2, epoch0, None),
289 (TEST_TABLE_ID2, epoch1, Some(&version4)),
290 (TEST_TABLE_ID2, epoch2, Some(&version4)),
291 ],
292 );
293
294 let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]);
295 let recent_versions = recent_versions.with_new_version(version5.clone());
296 assert_eq!(recent_versions.recent_versions.len(), 2);
297 assert!(recent_versions.is_latest_committed);
298 assert_query_equal(
299 &recent_versions,
300 &[
301 (TEST_TABLE_ID1, epoch0, None),
302 (TEST_TABLE_ID1, epoch1, None),
303 (TEST_TABLE_ID1, epoch2, Some(&version4)),
304 (TEST_TABLE_ID1, epoch3, Some(&version5)),
305 (TEST_TABLE_ID1, epoch4, Some(&version5)),
306 (TEST_TABLE_ID2, epoch0, None),
307 (TEST_TABLE_ID2, epoch1, Some(&version5)),
308 (TEST_TABLE_ID2, epoch2, Some(&version5)),
309 ],
310 );
311 }
312}