risingwave_hummock_sdk/
sstable_info.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24    PbBloomFilterType, PbKeyRange, PbSstableInfo, PbVnodeStatistics, PbVnodeUserKeyRange,
25};
26
27use crate::key::UserKey;
28use crate::key_range::KeyRange;
29use crate::version::{ObjectIdReader, SstableIdReader};
30use crate::{HummockSstableId, HummockSstableObjectId};
31
32pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
33
34#[derive(Debug, PartialEq, Clone, Default)]
35pub struct VnodeStatistics {
36    /// Per-vnode user key ranges as closed intervals: [`min_user_key`, `max_user_key`].
37    vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
38}
39
40#[derive(Debug, PartialEq, Clone)]
41#[cfg_attr(any(test, feature = "test"), derive(Default))]
42pub struct SstableInfoInner {
43    pub object_id: HummockSstableObjectId,
44    pub sst_id: HummockSstableId,
45    pub key_range: KeyRange,
46    pub file_size: u64,
47    pub table_ids: Vec<TableId>,
48    pub meta_offset: u64,
49    pub stale_key_count: u64,
50    pub total_key_count: u64,
51    pub min_epoch: u64,
52    pub max_epoch: u64,
53    pub uncompressed_file_size: u64,
54    pub range_tombstone_count: u64,
55    pub bloom_filter_kind: PbBloomFilterType,
56    pub sst_size: u64,
57    pub vnode_statistics: Option<VnodeStatistics>,
58}
59
60impl SstableInfoInner {
61    pub fn estimated_encode_len(&self) -> usize {
62        let mut basic = size_of::<u64>() // object_id
63            + size_of::<u64>() // sstable_id
64            + size_of::<u64>() // file_size
65            + self.table_ids.len() * size_of::<u32>() // table_ids
66            + size_of::<u64>() // meta_offset
67            + size_of::<u64>() // stale_key_count
68            + size_of::<u64>() // total_key_count
69            + size_of::<u64>() // min_epoch
70            + size_of::<u64>() // max_epoch
71            + size_of::<u64>() // uncompressed_file_size
72            + size_of::<u64>() // range_tombstone_count
73            + size_of::<u32>() // bloom_filter_kind
74            + size_of::<u64>(); // sst_size
75        basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
76        if let Some(vnode_statistics) = &self.vnode_statistics {
77            for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
78                basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
79            }
80        }
81
82        basic
83    }
84
85    pub fn to_protobuf(&self) -> PbSstableInfo {
86        self.into()
87    }
88}
89
90impl From<&PbVnodeStatistics> for VnodeStatistics {
91    fn from(info: &PbVnodeStatistics) -> Self {
92        Self {
93            vnode_user_key_ranges: info
94                .vnode_user_key_ranges
95                .iter()
96                .map(|(&vnode, range)| {
97                    let min_key = UserKey::decode(&range.min_key).copy_into();
98                    let max_key = UserKey::decode(&range.max_key).copy_into();
99
100                    // assert shared same vnode and table-id
101                    assert_eq!(min_key.table_id, max_key.table_id);
102                    assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
103
104                    (VirtualNode::from_index(vnode as usize), (min_key, max_key))
105                })
106                .collect(),
107        }
108    }
109}
110
111impl From<PbVnodeStatistics> for VnodeStatistics {
112    fn from(info: PbVnodeStatistics) -> Self {
113        (&info).into()
114    }
115}
116
117impl From<&VnodeStatistics> for PbVnodeStatistics {
118    fn from(info: &VnodeStatistics) -> Self {
119        Self {
120            vnode_user_key_ranges: info
121                .vnode_user_key_ranges
122                .iter()
123                .map(|(vnode, (min_key, max_key))| {
124                    (
125                        vnode.to_index() as u32,
126                        PbVnodeUserKeyRange {
127                            min_key: min_key.encode(),
128                            max_key: max_key.encode(),
129                        },
130                    )
131                })
132                .collect(),
133        }
134    }
135}
136
137impl From<VnodeStatistics> for PbVnodeStatistics {
138    fn from(info: VnodeStatistics) -> Self {
139        (&info).into()
140    }
141}
142
143impl From<PbSstableInfo> for SstableInfoInner {
144    fn from(pb_sstable_info: PbSstableInfo) -> Self {
145        assert!(pb_sstable_info.table_ids.is_sorted());
146        Self {
147            object_id: pb_sstable_info.object_id,
148            sst_id: pb_sstable_info.sst_id,
149            key_range: {
150                // Due to the stripped key range, the key range may be `None`
151                if let Some(pb_keyrange) = pb_sstable_info.key_range {
152                    KeyRange {
153                        left: pb_keyrange.left.into(),
154                        right: pb_keyrange.right.into(),
155                        right_exclusive: pb_keyrange.right_exclusive,
156                    }
157                } else {
158                    KeyRange::inf()
159                }
160            },
161            file_size: pb_sstable_info.file_size,
162            table_ids: pb_sstable_info.table_ids,
163            meta_offset: pb_sstable_info.meta_offset,
164            stale_key_count: pb_sstable_info.stale_key_count,
165            total_key_count: pb_sstable_info.total_key_count,
166            min_epoch: pb_sstable_info.min_epoch,
167            max_epoch: pb_sstable_info.max_epoch,
168            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
169            range_tombstone_count: pb_sstable_info.range_tombstone_count,
170            bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
171                .unwrap(),
172            sst_size: if pb_sstable_info.sst_size == 0 {
173                pb_sstable_info.file_size
174            } else {
175                pb_sstable_info.sst_size
176            },
177            vnode_statistics: pb_sstable_info
178                .vnode_statistics
179                .as_ref()
180                .map(VnodeStatistics::from),
181        }
182    }
183}
184
185impl From<&PbSstableInfo> for SstableInfoInner {
186    fn from(pb_sstable_info: &PbSstableInfo) -> Self {
187        assert!(pb_sstable_info.table_ids.is_sorted());
188        Self {
189            object_id: pb_sstable_info.object_id,
190            sst_id: pb_sstable_info.sst_id,
191            key_range: {
192                if let Some(pb_keyrange) = &pb_sstable_info.key_range {
193                    KeyRange {
194                        left: pb_keyrange.left.clone().into(),
195                        right: pb_keyrange.right.clone().into(),
196                        right_exclusive: pb_keyrange.right_exclusive,
197                    }
198                } else {
199                    KeyRange::inf()
200                }
201            },
202            file_size: pb_sstable_info.file_size,
203            table_ids: pb_sstable_info.table_ids.clone(),
204            meta_offset: pb_sstable_info.meta_offset,
205            stale_key_count: pb_sstable_info.stale_key_count,
206            total_key_count: pb_sstable_info.total_key_count,
207            min_epoch: pb_sstable_info.min_epoch,
208            max_epoch: pb_sstable_info.max_epoch,
209            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
210            range_tombstone_count: pb_sstable_info.range_tombstone_count,
211            bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
212                .unwrap(),
213            sst_size: if pb_sstable_info.sst_size == 0 {
214                pb_sstable_info.file_size
215            } else {
216                pb_sstable_info.sst_size
217            },
218            vnode_statistics: pb_sstable_info
219                .vnode_statistics
220                .as_ref()
221                .map(VnodeStatistics::from),
222        }
223    }
224}
225
226impl From<SstableInfoInner> for PbSstableInfo {
227    fn from(sstable_info: SstableInfoInner) -> Self {
228        assert!(sstable_info.table_ids.is_sorted());
229        PbSstableInfo {
230            object_id: sstable_info.object_id,
231            sst_id: sstable_info.sst_id,
232            key_range: {
233                let keyrange = sstable_info.key_range;
234                if keyrange.inf_key_range() {
235                    // For empty key range, we don't need to encode it
236                    // Timetravel will use the default key range to stripped the PbSstableInfo
237                    // Note: If new fields are added, using Default to implement stripped may not work, resulting in an increase in encode size.
238                    None
239                } else {
240                    let pb_key_range = PbKeyRange {
241                        left: keyrange.left.into(),
242                        right: keyrange.right.into(),
243                        right_exclusive: keyrange.right_exclusive,
244                    };
245                    Some(pb_key_range)
246                }
247            },
248
249            file_size: sstable_info.file_size,
250            table_ids: sstable_info.table_ids,
251            meta_offset: sstable_info.meta_offset,
252            stale_key_count: sstable_info.stale_key_count,
253            total_key_count: sstable_info.total_key_count,
254            min_epoch: sstable_info.min_epoch,
255            max_epoch: sstable_info.max_epoch,
256            uncompressed_file_size: sstable_info.uncompressed_file_size,
257            range_tombstone_count: sstable_info.range_tombstone_count,
258            bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
259            sst_size: sstable_info.sst_size,
260            vnode_statistics: sstable_info
261                .vnode_statistics
262                .as_ref()
263                .map(PbVnodeStatistics::from),
264        }
265    }
266}
267
268impl From<&SstableInfoInner> for PbSstableInfo {
269    fn from(sstable_info: &SstableInfoInner) -> Self {
270        assert!(sstable_info.table_ids.is_sorted());
271        PbSstableInfo {
272            object_id: sstable_info.object_id,
273            sst_id: sstable_info.sst_id,
274            key_range: {
275                let keyrange = &sstable_info.key_range;
276                if keyrange.inf_key_range() {
277                    None
278                } else {
279                    let pb_key_range = PbKeyRange {
280                        left: keyrange.left.to_vec(),
281                        right: keyrange.right.to_vec(),
282                        right_exclusive: keyrange.right_exclusive,
283                    };
284                    Some(pb_key_range)
285                }
286            },
287
288            file_size: sstable_info.file_size,
289            table_ids: sstable_info.table_ids.clone(),
290            meta_offset: sstable_info.meta_offset,
291            stale_key_count: sstable_info.stale_key_count,
292            total_key_count: sstable_info.total_key_count,
293            min_epoch: sstable_info.min_epoch,
294            max_epoch: sstable_info.max_epoch,
295            uncompressed_file_size: sstable_info.uncompressed_file_size,
296            range_tombstone_count: sstable_info.range_tombstone_count,
297            bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
298            sst_size: sstable_info.sst_size,
299            vnode_statistics: sstable_info
300                .vnode_statistics
301                .as_ref()
302                .map(PbVnodeStatistics::from),
303        }
304    }
305}
306
307impl VnodeStatistics {
308    pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
309        Self {
310            vnode_user_key_ranges,
311        }
312    }
313
314    /// Returns vnode user key range (`min_user_key`, `max_user_key`) if available.
315    pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
316        self.vnode_user_key_ranges.get(&vnode)
317    }
318
319    #[cfg(any(test, feature = "test"))]
320    pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
321        &self.vnode_user_key_ranges
322    }
323}
324
325impl SstableInfo {
326    pub fn remove_key_range(&mut self) {
327        let mut sst = self.get_inner();
328        sst.key_range = KeyRange::default();
329        *self = sst.into()
330    }
331}
332
333impl SstableIdReader for SstableInfoInner {
334    fn sst_id(&self) -> HummockSstableId {
335        self.sst_id
336    }
337}
338
339impl ObjectIdReader for SstableInfoInner {
340    fn object_id(&self) -> HummockSstableObjectId {
341        self.object_id
342    }
343}
344
345#[derive(Debug, PartialEq, Clone)]
346#[cfg_attr(any(test, feature = "test"), derive(Default))]
347pub struct SstableInfo(Arc<SstableInfoInner>);
348
349impl From<&PbSstableInfo> for SstableInfo {
350    fn from(s: &PbSstableInfo) -> Self {
351        SstableInfo(SstableInfoInner::from(s).into())
352    }
353}
354
355impl From<PbSstableInfo> for SstableInfo {
356    fn from(s: PbSstableInfo) -> Self {
357        SstableInfo(SstableInfoInner::from(s).into())
358    }
359}
360
361impl From<SstableInfo> for PbSstableInfo {
362    fn from(s: SstableInfo) -> Self {
363        (&s).into()
364    }
365}
366
367impl From<SstableInfoInner> for SstableInfo {
368    fn from(s: SstableInfoInner) -> Self {
369        Self(s.into())
370    }
371}
372
373impl From<&SstableInfo> for PbSstableInfo {
374    fn from(s: &SstableInfo) -> Self {
375        s.0.as_ref().into()
376    }
377}
378
379impl Deref for SstableInfo {
380    type Target = SstableInfoInner;
381
382    fn deref(&self) -> &Self::Target {
383        &self.0
384    }
385}
386
387impl SstableInfo {
388    pub fn get_inner(&self) -> SstableInfoInner {
389        (*self.0).clone()
390    }
391
392    pub fn set_inner(&mut self, inner: SstableInfoInner) {
393        self.0 = Arc::new(inner);
394    }
395}
396
397impl SstableIdReader for SstableInfo {
398    fn sst_id(&self) -> HummockSstableId {
399        self.sst_id
400    }
401}
402
403impl ObjectIdReader for SstableInfo {
404    fn object_id(&self) -> HummockSstableObjectId {
405        self.object_id
406    }
407}