1use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24 PbBloomFilterType, PbKeyRange, PbSstableInfo, PbVnodeStatistics, PbVnodeUserKeyRange,
25};
26
27use crate::key::UserKey;
28use crate::key_range::KeyRange;
29use crate::version::{ObjectIdReader, SstableIdReader};
30use crate::{HummockSstableId, HummockSstableObjectId};
31
32pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
33
34#[derive(Debug, PartialEq, Clone, Default)]
35pub struct VnodeStatistics {
36 vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
38}
39
40#[derive(Debug, PartialEq, Clone)]
41#[cfg_attr(any(test, feature = "test"), derive(Default))]
42pub struct SstableInfoInner {
43 pub object_id: HummockSstableObjectId,
44 pub sst_id: HummockSstableId,
45 pub key_range: KeyRange,
46 pub file_size: u64,
47 pub table_ids: Vec<TableId>,
48 pub meta_offset: u64,
49 pub stale_key_count: u64,
50 pub total_key_count: u64,
51 pub min_epoch: u64,
52 pub max_epoch: u64,
53 pub uncompressed_file_size: u64,
54 pub range_tombstone_count: u64,
55 pub bloom_filter_kind: PbBloomFilterType,
56 pub sst_size: u64,
57 pub vnode_statistics: Option<VnodeStatistics>,
58}
59
60impl SstableInfoInner {
61 pub fn estimated_encode_len(&self) -> usize {
62 let mut basic = size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + self.table_ids.len() * size_of::<u32>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u32>() + size_of::<u64>(); basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
76 if let Some(vnode_statistics) = &self.vnode_statistics {
77 for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
78 basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
79 }
80 }
81
82 basic
83 }
84
85 pub fn to_protobuf(&self) -> PbSstableInfo {
86 self.into()
87 }
88}
89
90impl From<&PbVnodeStatistics> for VnodeStatistics {
91 fn from(info: &PbVnodeStatistics) -> Self {
92 Self {
93 vnode_user_key_ranges: info
94 .vnode_user_key_ranges
95 .iter()
96 .map(|(&vnode, range)| {
97 let min_key = UserKey::decode(&range.min_key).copy_into();
98 let max_key = UserKey::decode(&range.max_key).copy_into();
99
100 assert_eq!(min_key.table_id, max_key.table_id);
102 assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
103
104 (VirtualNode::from_index(vnode as usize), (min_key, max_key))
105 })
106 .collect(),
107 }
108 }
109}
110
111impl From<PbVnodeStatistics> for VnodeStatistics {
112 fn from(info: PbVnodeStatistics) -> Self {
113 (&info).into()
114 }
115}
116
117impl From<&VnodeStatistics> for PbVnodeStatistics {
118 fn from(info: &VnodeStatistics) -> Self {
119 Self {
120 vnode_user_key_ranges: info
121 .vnode_user_key_ranges
122 .iter()
123 .map(|(vnode, (min_key, max_key))| {
124 (
125 vnode.to_index() as u32,
126 PbVnodeUserKeyRange {
127 min_key: min_key.encode(),
128 max_key: max_key.encode(),
129 },
130 )
131 })
132 .collect(),
133 }
134 }
135}
136
137impl From<VnodeStatistics> for PbVnodeStatistics {
138 fn from(info: VnodeStatistics) -> Self {
139 (&info).into()
140 }
141}
142
143impl From<PbSstableInfo> for SstableInfoInner {
144 fn from(pb_sstable_info: PbSstableInfo) -> Self {
145 assert!(pb_sstable_info.table_ids.is_sorted());
146 Self {
147 object_id: pb_sstable_info.object_id,
148 sst_id: pb_sstable_info.sst_id,
149 key_range: {
150 if let Some(pb_keyrange) = pb_sstable_info.key_range {
152 KeyRange {
153 left: pb_keyrange.left.into(),
154 right: pb_keyrange.right.into(),
155 right_exclusive: pb_keyrange.right_exclusive,
156 }
157 } else {
158 KeyRange::inf()
159 }
160 },
161 file_size: pb_sstable_info.file_size,
162 table_ids: pb_sstable_info.table_ids,
163 meta_offset: pb_sstable_info.meta_offset,
164 stale_key_count: pb_sstable_info.stale_key_count,
165 total_key_count: pb_sstable_info.total_key_count,
166 min_epoch: pb_sstable_info.min_epoch,
167 max_epoch: pb_sstable_info.max_epoch,
168 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
169 range_tombstone_count: pb_sstable_info.range_tombstone_count,
170 bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
171 .unwrap(),
172 sst_size: if pb_sstable_info.sst_size == 0 {
173 pb_sstable_info.file_size
174 } else {
175 pb_sstable_info.sst_size
176 },
177 vnode_statistics: pb_sstable_info
178 .vnode_statistics
179 .as_ref()
180 .map(VnodeStatistics::from),
181 }
182 }
183}
184
185impl From<&PbSstableInfo> for SstableInfoInner {
186 fn from(pb_sstable_info: &PbSstableInfo) -> Self {
187 assert!(pb_sstable_info.table_ids.is_sorted());
188 Self {
189 object_id: pb_sstable_info.object_id,
190 sst_id: pb_sstable_info.sst_id,
191 key_range: {
192 if let Some(pb_keyrange) = &pb_sstable_info.key_range {
193 KeyRange {
194 left: pb_keyrange.left.clone().into(),
195 right: pb_keyrange.right.clone().into(),
196 right_exclusive: pb_keyrange.right_exclusive,
197 }
198 } else {
199 KeyRange::inf()
200 }
201 },
202 file_size: pb_sstable_info.file_size,
203 table_ids: pb_sstable_info.table_ids.clone(),
204 meta_offset: pb_sstable_info.meta_offset,
205 stale_key_count: pb_sstable_info.stale_key_count,
206 total_key_count: pb_sstable_info.total_key_count,
207 min_epoch: pb_sstable_info.min_epoch,
208 max_epoch: pb_sstable_info.max_epoch,
209 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
210 range_tombstone_count: pb_sstable_info.range_tombstone_count,
211 bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
212 .unwrap(),
213 sst_size: if pb_sstable_info.sst_size == 0 {
214 pb_sstable_info.file_size
215 } else {
216 pb_sstable_info.sst_size
217 },
218 vnode_statistics: pb_sstable_info
219 .vnode_statistics
220 .as_ref()
221 .map(VnodeStatistics::from),
222 }
223 }
224}
225
226impl From<SstableInfoInner> for PbSstableInfo {
227 fn from(sstable_info: SstableInfoInner) -> Self {
228 assert!(sstable_info.table_ids.is_sorted());
229 PbSstableInfo {
230 object_id: sstable_info.object_id,
231 sst_id: sstable_info.sst_id,
232 key_range: {
233 let keyrange = sstable_info.key_range;
234 if keyrange.inf_key_range() {
235 None
239 } else {
240 let pb_key_range = PbKeyRange {
241 left: keyrange.left.into(),
242 right: keyrange.right.into(),
243 right_exclusive: keyrange.right_exclusive,
244 };
245 Some(pb_key_range)
246 }
247 },
248
249 file_size: sstable_info.file_size,
250 table_ids: sstable_info.table_ids,
251 meta_offset: sstable_info.meta_offset,
252 stale_key_count: sstable_info.stale_key_count,
253 total_key_count: sstable_info.total_key_count,
254 min_epoch: sstable_info.min_epoch,
255 max_epoch: sstable_info.max_epoch,
256 uncompressed_file_size: sstable_info.uncompressed_file_size,
257 range_tombstone_count: sstable_info.range_tombstone_count,
258 bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
259 sst_size: sstable_info.sst_size,
260 vnode_statistics: sstable_info
261 .vnode_statistics
262 .as_ref()
263 .map(PbVnodeStatistics::from),
264 }
265 }
266}
267
268impl From<&SstableInfoInner> for PbSstableInfo {
269 fn from(sstable_info: &SstableInfoInner) -> Self {
270 assert!(sstable_info.table_ids.is_sorted());
271 PbSstableInfo {
272 object_id: sstable_info.object_id,
273 sst_id: sstable_info.sst_id,
274 key_range: {
275 let keyrange = &sstable_info.key_range;
276 if keyrange.inf_key_range() {
277 None
278 } else {
279 let pb_key_range = PbKeyRange {
280 left: keyrange.left.to_vec(),
281 right: keyrange.right.to_vec(),
282 right_exclusive: keyrange.right_exclusive,
283 };
284 Some(pb_key_range)
285 }
286 },
287
288 file_size: sstable_info.file_size,
289 table_ids: sstable_info.table_ids.clone(),
290 meta_offset: sstable_info.meta_offset,
291 stale_key_count: sstable_info.stale_key_count,
292 total_key_count: sstable_info.total_key_count,
293 min_epoch: sstable_info.min_epoch,
294 max_epoch: sstable_info.max_epoch,
295 uncompressed_file_size: sstable_info.uncompressed_file_size,
296 range_tombstone_count: sstable_info.range_tombstone_count,
297 bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
298 sst_size: sstable_info.sst_size,
299 vnode_statistics: sstable_info
300 .vnode_statistics
301 .as_ref()
302 .map(PbVnodeStatistics::from),
303 }
304 }
305}
306
307impl VnodeStatistics {
308 pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
309 Self {
310 vnode_user_key_ranges,
311 }
312 }
313
314 pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
316 self.vnode_user_key_ranges.get(&vnode)
317 }
318
319 #[cfg(any(test, feature = "test"))]
320 pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
321 &self.vnode_user_key_ranges
322 }
323}
324
325impl SstableInfo {
326 pub fn remove_key_range(&mut self) {
327 let mut sst = self.get_inner();
328 sst.key_range = KeyRange::default();
329 *self = sst.into()
330 }
331}
332
333impl SstableIdReader for SstableInfoInner {
334 fn sst_id(&self) -> HummockSstableId {
335 self.sst_id
336 }
337}
338
339impl ObjectIdReader for SstableInfoInner {
340 fn object_id(&self) -> HummockSstableObjectId {
341 self.object_id
342 }
343}
344
345#[derive(Debug, PartialEq, Clone)]
346#[cfg_attr(any(test, feature = "test"), derive(Default))]
347pub struct SstableInfo(Arc<SstableInfoInner>);
348
349impl From<&PbSstableInfo> for SstableInfo {
350 fn from(s: &PbSstableInfo) -> Self {
351 SstableInfo(SstableInfoInner::from(s).into())
352 }
353}
354
355impl From<PbSstableInfo> for SstableInfo {
356 fn from(s: PbSstableInfo) -> Self {
357 SstableInfo(SstableInfoInner::from(s).into())
358 }
359}
360
361impl From<SstableInfo> for PbSstableInfo {
362 fn from(s: SstableInfo) -> Self {
363 (&s).into()
364 }
365}
366
367impl From<SstableInfoInner> for SstableInfo {
368 fn from(s: SstableInfoInner) -> Self {
369 Self(s.into())
370 }
371}
372
373impl From<&SstableInfo> for PbSstableInfo {
374 fn from(s: &SstableInfo) -> Self {
375 s.0.as_ref().into()
376 }
377}
378
379impl Deref for SstableInfo {
380 type Target = SstableInfoInner;
381
382 fn deref(&self) -> &Self::Target {
383 &self.0
384 }
385}
386
387impl SstableInfo {
388 pub fn get_inner(&self) -> SstableInfoInner {
389 (*self.0).clone()
390 }
391
392 pub fn set_inner(&mut self, inner: SstableInfoInner) {
393 self.0 = Arc::new(inner);
394 }
395}
396
397impl SstableIdReader for SstableInfo {
398 fn sst_id(&self) -> HummockSstableId {
399 self.sst_id
400 }
401}
402
403impl ObjectIdReader for SstableInfo {
404 fn object_id(&self) -> HummockSstableObjectId {
405 self.object_id
406 }
407}