risingwave_storage/hummock/local_version/
recent_versions.rs1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::local_version::pinned_version::PinnedVersion;
22use crate::monitor::HummockStateStoreMetrics;
23
24pub struct RecentVersions {
25 latest_version: PinnedVersion,
26 is_latest_committed: bool,
27 recent_versions: Vec<PinnedVersion>, max_version_num: usize,
29 metric: Arc<HummockStateStoreMetrics>,
30}
31
32impl RecentVersions {
33 pub fn new(
34 version: PinnedVersion,
35 max_version_num: usize,
36 metric: Arc<HummockStateStoreMetrics>,
37 ) -> Self {
38 assert!(max_version_num > 0);
39 Self {
40 latest_version: version,
41 is_latest_committed: true, recent_versions: Vec::new(),
43 max_version_num,
44 metric,
45 }
46 }
47
48 fn has_table_committed(&self, new_version: &PinnedVersion) -> bool {
49 let mut has_table_committed = false;
50 for (table_id, info) in new_version.state_table_info.info() {
51 if let Some(prev_info) = self.latest_version.state_table_info.info().get(table_id) {
52 match info.committed_epoch.cmp(&prev_info.committed_epoch) {
53 Ordering::Less => {
54 unreachable!(
55 "table {} has regress committed epoch {}, prev committed epoch {}",
56 table_id, info.committed_epoch, prev_info.committed_epoch
57 );
58 }
59 Ordering::Equal => {}
60 Ordering::Greater => {
61 has_table_committed = true;
62 }
63 }
64 } else {
65 has_table_committed = true;
66 }
67 }
68 has_table_committed
69 }
70
71 #[must_use]
72 pub fn with_new_version(&self, version: PinnedVersion) -> Self {
73 assert!(version.id > self.latest_version.id);
74 let is_committed = self.has_table_committed(&version);
75 let recent_versions = if self.is_latest_committed {
76 let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num {
77 assert_eq!(self.recent_versions.len(), self.max_version_num);
78 &self.recent_versions[1..]
79 } else {
80 &self.recent_versions[..]
81 };
82 let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1);
83 recent_versions.extend(prev_recent_versions.iter().cloned());
84 recent_versions.push(self.latest_version.clone());
85 recent_versions
86 } else {
87 self.recent_versions.clone()
88 };
89 Self {
90 latest_version: version,
91 is_latest_committed: is_committed,
92 recent_versions,
93 max_version_num: self.max_version_num,
94 metric: self.metric.clone(),
95 }
96 }
97
98 pub fn latest_version(&self) -> &PinnedVersion {
99 &self.latest_version
100 }
101
102 pub fn get_safe_version(
106 &self,
107 table_id: TableId,
108 epoch: HummockEpoch,
109 ) -> Option<PinnedVersion> {
110 let result = if let Some(info) = self.latest_version.state_table_info.info().get(&table_id)
111 {
112 if info.committed_epoch <= epoch {
113 Some(self.latest_version.clone())
114 } else {
115 self.get_safe_version_from_recent(table_id, epoch)
116 }
117 } else {
118 None
119 };
120 if result.is_some() {
121 self.metric.safe_version_hit.inc();
122 } else {
123 self.metric.safe_version_miss.inc();
124 }
125 result
126 }
127
128 fn get_safe_version_from_recent(
129 &self,
130 table_id: TableId,
131 epoch: HummockEpoch,
132 ) -> Option<PinnedVersion> {
133 if cfg!(debug_assertions) {
134 assert!(
135 epoch
136 < self
137 .latest_version
138 .state_table_info
139 .info()
140 .get(&table_id)
141 .expect("should exist")
142 .committed_epoch
143 );
144 }
145 let result = self.recent_versions.binary_search_by(|version| {
146 let committed_epoch = version.table_committed_epoch(table_id);
147 if let Some(committed_epoch) = committed_epoch {
148 committed_epoch.cmp(&epoch)
149 } else {
150 Ordering::Less
153 }
154 });
155 match result {
156 Ok(index) => Some(self.recent_versions[index].clone()),
157 Err(index) => {
158 let version = if index >= self.recent_versions.len() {
161 assert_eq!(index, self.recent_versions.len());
162 self.recent_versions.last().cloned()
163 } else if index == 0 {
164 None
166 } else {
167 self.recent_versions.get(index - 1).cloned()
168 };
169 version.and_then(|version| {
170 if version.state_table_info.info().contains_key(&table_id) {
171 Some(version)
172 } else {
173 None
175 }
176 })
177 }
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::collections::HashMap;
185
186 use risingwave_common::catalog::TableId;
187 use risingwave_hummock_sdk::version::HummockVersion;
188 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
189 use tokio::sync::mpsc::unbounded_channel;
190
191 use crate::hummock::local_version::pinned_version::PinnedVersion;
192 use crate::hummock::local_version::recent_versions::RecentVersions;
193 use crate::monitor::HummockStateStoreMetrics;
194
195 const TEST_TABLE_ID1: TableId = TableId::new(233);
196 const TEST_TABLE_ID2: TableId = TableId::new(234);
197
198 fn gen_pin_version(
199 version_id: u64,
200 table_committed_epoch: impl IntoIterator<Item = (TableId, u64)>,
201 ) -> PinnedVersion {
202 PinnedVersion::new(
203 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
204 id: version_id,
205 state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map(
206 |(table_id, committed_epoch)| {
207 (
208 table_id.table_id,
209 StateTableInfo {
210 committed_epoch,
211 compaction_group_id: 0,
212 },
213 )
214 },
215 )),
216 ..Default::default()
217 }),
218 unbounded_channel().0,
219 )
220 }
221
222 fn assert_query_equal(
223 recent_version: &RecentVersions,
224 expected: &[(TableId, u64, Option<&PinnedVersion>)],
225 ) {
226 for (table_id, epoch, expected_version) in expected
227 .iter()
228 .cloned()
229 .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)])
230 {
231 let version = recent_version.get_safe_version(table_id, epoch);
232 assert_eq!(
233 version.as_ref().map(|version| version.id()),
234 expected_version.map(|version| version.id())
235 );
236 }
237 }
238
239 #[test]
240 fn test_basic() {
241 let epoch1 = 233;
242 let epoch0 = epoch1 - 1;
243 let epoch2 = epoch1 + 1;
244 let epoch3 = epoch2 + 1;
245 let epoch4 = epoch3 + 1;
246 let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
247 let recent_versions = RecentVersions::new(
249 version1.clone(),
250 2,
251 HummockStateStoreMetrics::unused().into(),
252 );
253 assert!(recent_versions.recent_versions.is_empty());
254 assert!(recent_versions.is_latest_committed);
255 assert_query_equal(
256 &recent_versions,
257 &[
258 (TEST_TABLE_ID1, epoch0, None),
259 (TEST_TABLE_ID1, epoch1, Some(&version1)),
260 (TEST_TABLE_ID1, epoch2, Some(&version1)),
261 ],
262 );
263
264 let recent_versions =
265 recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)]));
266 assert_eq!(recent_versions.recent_versions.len(), 1);
267 assert!(!recent_versions.is_latest_committed);
268
269 let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]);
270 let recent_versions = recent_versions.with_new_version(version3.clone());
271 assert_eq!(recent_versions.recent_versions.len(), 1);
272 assert!(recent_versions.is_latest_committed);
273 assert_query_equal(
274 &recent_versions,
275 &[
276 (TEST_TABLE_ID1, epoch0, None),
277 (TEST_TABLE_ID1, epoch1, Some(&version1)),
278 (TEST_TABLE_ID1, epoch2, Some(&version3)),
279 (TEST_TABLE_ID1, epoch3, Some(&version3)),
280 ],
281 );
282
283 let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]);
284 let recent_versions = recent_versions.with_new_version(version4.clone());
285 assert_eq!(recent_versions.recent_versions.len(), 2);
286 assert!(recent_versions.is_latest_committed);
287 assert_query_equal(
288 &recent_versions,
289 &[
290 (TEST_TABLE_ID1, epoch0, None),
291 (TEST_TABLE_ID1, epoch1, Some(&version1)),
292 (TEST_TABLE_ID1, epoch2, Some(&version4)),
293 (TEST_TABLE_ID1, epoch3, Some(&version4)),
294 (TEST_TABLE_ID2, epoch0, None),
295 (TEST_TABLE_ID2, epoch1, Some(&version4)),
296 (TEST_TABLE_ID2, epoch2, Some(&version4)),
297 ],
298 );
299
300 let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]);
301 let recent_versions = recent_versions.with_new_version(version5.clone());
302 assert_eq!(recent_versions.recent_versions.len(), 2);
303 assert!(recent_versions.is_latest_committed);
304 assert_query_equal(
305 &recent_versions,
306 &[
307 (TEST_TABLE_ID1, epoch0, None),
308 (TEST_TABLE_ID1, epoch1, None),
309 (TEST_TABLE_ID1, epoch2, Some(&version4)),
310 (TEST_TABLE_ID1, epoch3, Some(&version5)),
311 (TEST_TABLE_ID1, epoch4, Some(&version5)),
312 (TEST_TABLE_ID2, epoch0, None),
313 (TEST_TABLE_ID2, epoch1, Some(&version5)),
314 (TEST_TABLE_ID2, epoch2, Some(&version5)),
315 ],
316 );
317 }
318}