1use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24 PbBloomFilterType, PbKeyRange, PbSstableFilterType, PbSstableInfo, PbVnodeStatistics,
25 PbVnodeUserKeyRange,
26};
27
28use crate::key::UserKey;
29use crate::key_range::KeyRange;
30use crate::version::{ObjectIdReader, SstableIdReader};
31use crate::{HummockSstableId, HummockSstableObjectId};
32
33pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
34
35#[derive(Debug, PartialEq, Clone, Default)]
36pub struct VnodeStatistics {
37 vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
39}
40
41#[derive(Debug, PartialEq, Clone)]
42#[cfg_attr(any(test, feature = "test"), derive(Default))]
43pub struct SstableInfoInner {
44 pub object_id: HummockSstableObjectId,
45 pub sst_id: HummockSstableId,
46 pub key_range: KeyRange,
47 pub file_size: u64,
48 pub table_ids: Vec<TableId>,
49 pub meta_offset: u64,
50 pub stale_key_count: u64,
51 pub total_key_count: u64,
52 pub min_epoch: u64,
53 pub max_epoch: u64,
54 pub uncompressed_file_size: u64,
55 pub range_tombstone_count: u64,
56 pub bloom_filter_kind: PbBloomFilterType,
57 pub filter_type: PbSstableFilterType,
58 pub sst_size: u64,
59 pub vnode_statistics: Option<VnodeStatistics>,
60}
61
62impl SstableInfoInner {
63 pub fn estimated_encode_len(&self) -> usize {
64 let mut basic = size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + self.table_ids.len() * size_of::<u32>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u32>() + size_of::<u32>() + size_of::<u64>(); basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
79 if let Some(vnode_statistics) = &self.vnode_statistics {
80 for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
81 basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
82 }
83 }
84
85 basic
86 }
87
88 pub fn to_protobuf(&self) -> PbSstableInfo {
89 self.into()
90 }
91}
92
93impl From<&PbVnodeStatistics> for VnodeStatistics {
94 fn from(info: &PbVnodeStatistics) -> Self {
95 Self {
96 vnode_user_key_ranges: info
97 .vnode_user_key_ranges
98 .iter()
99 .map(|(&vnode, range)| {
100 let min_key = UserKey::decode(&range.min_key).copy_into();
101 let max_key = UserKey::decode(&range.max_key).copy_into();
102
103 assert_eq!(min_key.table_id, max_key.table_id);
105 assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
106
107 (VirtualNode::from_index(vnode as usize), (min_key, max_key))
108 })
109 .collect(),
110 }
111 }
112}
113
114impl From<PbVnodeStatistics> for VnodeStatistics {
115 fn from(info: PbVnodeStatistics) -> Self {
116 (&info).into()
117 }
118}
119
120impl From<&VnodeStatistics> for PbVnodeStatistics {
121 fn from(info: &VnodeStatistics) -> Self {
122 Self {
123 vnode_user_key_ranges: info
124 .vnode_user_key_ranges
125 .iter()
126 .map(|(vnode, (min_key, max_key))| {
127 (
128 vnode.to_index() as u32,
129 PbVnodeUserKeyRange {
130 min_key: min_key.encode(),
131 max_key: max_key.encode(),
132 },
133 )
134 })
135 .collect(),
136 }
137 }
138}
139
140impl From<VnodeStatistics> for PbVnodeStatistics {
141 fn from(info: VnodeStatistics) -> Self {
142 (&info).into()
143 }
144}
145
146impl From<PbSstableInfo> for SstableInfoInner {
147 fn from(pb_sstable_info: PbSstableInfo) -> Self {
148 assert!(pb_sstable_info.table_ids.is_sorted());
149 Self {
150 object_id: pb_sstable_info.object_id,
151 sst_id: pb_sstable_info.sst_id,
152 key_range: {
153 if let Some(pb_keyrange) = pb_sstable_info.key_range {
155 KeyRange {
156 left: pb_keyrange.left.into(),
157 right: pb_keyrange.right.into(),
158 right_exclusive: pb_keyrange.right_exclusive,
159 }
160 } else {
161 KeyRange::inf()
162 }
163 },
164 file_size: pb_sstable_info.file_size,
165 table_ids: pb_sstable_info.table_ids,
166 meta_offset: pb_sstable_info.meta_offset,
167 stale_key_count: pb_sstable_info.stale_key_count,
168 total_key_count: pb_sstable_info.total_key_count,
169 min_epoch: pb_sstable_info.min_epoch,
170 max_epoch: pb_sstable_info.max_epoch,
171 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
172 range_tombstone_count: pb_sstable_info.range_tombstone_count,
173 bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
174 .unwrap(),
175 filter_type: PbSstableFilterType::try_from(pb_sstable_info.filter_type)
176 .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
177 sst_size: if pb_sstable_info.sst_size == 0 {
178 pb_sstable_info.file_size
179 } else {
180 pb_sstable_info.sst_size
181 },
182 vnode_statistics: pb_sstable_info
183 .vnode_statistics
184 .as_ref()
185 .map(VnodeStatistics::from),
186 }
187 }
188}
189
190impl From<&PbSstableInfo> for SstableInfoInner {
191 fn from(pb_sstable_info: &PbSstableInfo) -> Self {
192 assert!(pb_sstable_info.table_ids.is_sorted());
193 Self {
194 object_id: pb_sstable_info.object_id,
195 sst_id: pb_sstable_info.sst_id,
196 key_range: {
197 if let Some(pb_keyrange) = &pb_sstable_info.key_range {
198 KeyRange {
199 left: pb_keyrange.left.clone().into(),
200 right: pb_keyrange.right.clone().into(),
201 right_exclusive: pb_keyrange.right_exclusive,
202 }
203 } else {
204 KeyRange::inf()
205 }
206 },
207 file_size: pb_sstable_info.file_size,
208 table_ids: pb_sstable_info.table_ids.clone(),
209 meta_offset: pb_sstable_info.meta_offset,
210 stale_key_count: pb_sstable_info.stale_key_count,
211 total_key_count: pb_sstable_info.total_key_count,
212 min_epoch: pb_sstable_info.min_epoch,
213 max_epoch: pb_sstable_info.max_epoch,
214 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
215 range_tombstone_count: pb_sstable_info.range_tombstone_count,
216 bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
217 .unwrap(),
218 filter_type: PbSstableFilterType::try_from(pb_sstable_info.filter_type)
219 .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
220 sst_size: if pb_sstable_info.sst_size == 0 {
221 pb_sstable_info.file_size
222 } else {
223 pb_sstable_info.sst_size
224 },
225 vnode_statistics: pb_sstable_info
226 .vnode_statistics
227 .as_ref()
228 .map(VnodeStatistics::from),
229 }
230 }
231}
232
233impl From<SstableInfoInner> for PbSstableInfo {
234 fn from(sstable_info: SstableInfoInner) -> Self {
235 assert!(sstable_info.table_ids.is_sorted());
236 PbSstableInfo {
237 object_id: sstable_info.object_id,
238 sst_id: sstable_info.sst_id,
239 key_range: {
240 let keyrange = sstable_info.key_range;
241 if keyrange.inf_key_range() {
242 None
246 } else {
247 let pb_key_range = PbKeyRange {
248 left: keyrange.left.into(),
249 right: keyrange.right.into(),
250 right_exclusive: keyrange.right_exclusive,
251 };
252 Some(pb_key_range)
253 }
254 },
255
256 file_size: sstable_info.file_size,
257 table_ids: sstable_info.table_ids,
258 meta_offset: sstable_info.meta_offset,
259 stale_key_count: sstable_info.stale_key_count,
260 total_key_count: sstable_info.total_key_count,
261 min_epoch: sstable_info.min_epoch,
262 max_epoch: sstable_info.max_epoch,
263 uncompressed_file_size: sstable_info.uncompressed_file_size,
264 range_tombstone_count: sstable_info.range_tombstone_count,
265 bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
266 filter_type: sstable_info.filter_type.into(),
267 sst_size: sstable_info.sst_size,
268 vnode_statistics: sstable_info
269 .vnode_statistics
270 .as_ref()
271 .map(PbVnodeStatistics::from),
272 }
273 }
274}
275
276impl From<&SstableInfoInner> for PbSstableInfo {
277 fn from(sstable_info: &SstableInfoInner) -> Self {
278 assert!(sstable_info.table_ids.is_sorted());
279 PbSstableInfo {
280 object_id: sstable_info.object_id,
281 sst_id: sstable_info.sst_id,
282 key_range: {
283 let keyrange = &sstable_info.key_range;
284 if keyrange.inf_key_range() {
285 None
286 } else {
287 let pb_key_range = PbKeyRange {
288 left: keyrange.left.to_vec(),
289 right: keyrange.right.to_vec(),
290 right_exclusive: keyrange.right_exclusive,
291 };
292 Some(pb_key_range)
293 }
294 },
295
296 file_size: sstable_info.file_size,
297 table_ids: sstable_info.table_ids.clone(),
298 meta_offset: sstable_info.meta_offset,
299 stale_key_count: sstable_info.stale_key_count,
300 total_key_count: sstable_info.total_key_count,
301 min_epoch: sstable_info.min_epoch,
302 max_epoch: sstable_info.max_epoch,
303 uncompressed_file_size: sstable_info.uncompressed_file_size,
304 range_tombstone_count: sstable_info.range_tombstone_count,
305 bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
306 filter_type: sstable_info.filter_type.into(),
307 sst_size: sstable_info.sst_size,
308 vnode_statistics: sstable_info
309 .vnode_statistics
310 .as_ref()
311 .map(PbVnodeStatistics::from),
312 }
313 }
314}
315
316impl VnodeStatistics {
317 pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
318 Self {
319 vnode_user_key_ranges,
320 }
321 }
322
323 pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
325 self.vnode_user_key_ranges.get(&vnode)
326 }
327
328 #[cfg(any(test, feature = "test"))]
329 pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
330 &self.vnode_user_key_ranges
331 }
332}
333
334impl SstableInfo {
335 pub fn remove_key_range(&mut self) {
336 let mut sst = self.get_inner();
337 sst.key_range = KeyRange::default();
338 *self = sst.into()
339 }
340
341 pub fn filter_type_compatible_with(&self, filter_type: PbSstableFilterType) -> bool {
342 match self.filter_type {
343 PbSstableFilterType::SstableFilterUnspecified => {
344 filter_type == PbSstableFilterType::SstableFilterXor16
345 }
346 ty => ty == filter_type,
347 }
348 }
349}
350
351impl SstableIdReader for SstableInfoInner {
352 fn sst_id(&self) -> HummockSstableId {
353 self.sst_id
354 }
355}
356
357impl ObjectIdReader for SstableInfoInner {
358 fn object_id(&self) -> HummockSstableObjectId {
359 self.object_id
360 }
361}
362
363#[derive(Debug, PartialEq, Clone)]
364#[cfg_attr(any(test, feature = "test"), derive(Default))]
365pub struct SstableInfo(Arc<SstableInfoInner>);
366
367impl From<&PbSstableInfo> for SstableInfo {
368 fn from(s: &PbSstableInfo) -> Self {
369 SstableInfo(SstableInfoInner::from(s).into())
370 }
371}
372
373impl From<PbSstableInfo> for SstableInfo {
374 fn from(s: PbSstableInfo) -> Self {
375 SstableInfo(SstableInfoInner::from(s).into())
376 }
377}
378
379impl From<SstableInfo> for PbSstableInfo {
380 fn from(s: SstableInfo) -> Self {
381 (&s).into()
382 }
383}
384
385impl From<SstableInfoInner> for SstableInfo {
386 fn from(s: SstableInfoInner) -> Self {
387 Self(s.into())
388 }
389}
390
391impl From<&SstableInfo> for PbSstableInfo {
392 fn from(s: &SstableInfo) -> Self {
393 s.0.as_ref().into()
394 }
395}
396
397impl Deref for SstableInfo {
398 type Target = SstableInfoInner;
399
400 fn deref(&self) -> &Self::Target {
401 &self.0
402 }
403}
404
405impl SstableInfo {
406 pub fn get_inner(&self) -> SstableInfoInner {
407 (*self.0).clone()
408 }
409
410 pub fn set_inner(&mut self, inner: SstableInfoInner) {
411 self.0 = Arc::new(inner);
412 }
413}
414
415impl SstableIdReader for SstableInfo {
416 fn sst_id(&self) -> HummockSstableId {
417 self.sst_id
418 }
419}
420
421impl ObjectIdReader for SstableInfo {
422 fn object_id(&self) -> HummockSstableObjectId {
423 self.object_id
424 }
425}