risingwave_hummock_sdk/
sstable_info.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24    PbBloomFilterType, PbKeyRange, PbSstableFilterType, PbSstableInfo, PbVnodeStatistics,
25    PbVnodeUserKeyRange,
26};
27
28use crate::key::UserKey;
29use crate::key_range::KeyRange;
30use crate::version::{ObjectIdReader, SstableIdReader};
31use crate::{HummockSstableId, HummockSstableObjectId};
32
33pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
34
35#[derive(Debug, PartialEq, Clone, Default)]
36pub struct VnodeStatistics {
37    /// Per-vnode user key ranges as closed intervals: [`min_user_key`, `max_user_key`].
38    vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
39}
40
41#[derive(Debug, PartialEq, Clone)]
42#[cfg_attr(any(test, feature = "test"), derive(Default))]
43pub struct SstableInfoInner {
44    pub object_id: HummockSstableObjectId,
45    pub sst_id: HummockSstableId,
46    pub key_range: KeyRange,
47    pub file_size: u64,
48    pub table_ids: Vec<TableId>,
49    pub meta_offset: u64,
50    pub stale_key_count: u64,
51    pub total_key_count: u64,
52    pub min_epoch: u64,
53    pub max_epoch: u64,
54    pub uncompressed_file_size: u64,
55    pub range_tombstone_count: u64,
56    pub bloom_filter_kind: PbBloomFilterType,
57    pub filter_type: PbSstableFilterType,
58    pub sst_size: u64,
59    pub vnode_statistics: Option<VnodeStatistics>,
60}
61
62impl SstableInfoInner {
63    pub fn estimated_encode_len(&self) -> usize {
64        let mut basic = size_of::<u64>() // object_id
65            + size_of::<u64>() // sstable_id
66            + size_of::<u64>() // file_size
67            + self.table_ids.len() * size_of::<u32>() // table_ids
68            + size_of::<u64>() // meta_offset
69            + size_of::<u64>() // stale_key_count
70            + size_of::<u64>() // total_key_count
71            + size_of::<u64>() // min_epoch
72            + size_of::<u64>() // max_epoch
73            + size_of::<u64>() // uncompressed_file_size
74            + size_of::<u64>() // range_tombstone_count
75            + size_of::<u32>() // bloom_filter_kind
76            + size_of::<u32>() // filter_type
77            + size_of::<u64>(); // sst_size
78        basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
79        if let Some(vnode_statistics) = &self.vnode_statistics {
80            for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
81                basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
82            }
83        }
84
85        basic
86    }
87
88    pub fn to_protobuf(&self) -> PbSstableInfo {
89        self.into()
90    }
91}
92
93impl From<&PbVnodeStatistics> for VnodeStatistics {
94    fn from(info: &PbVnodeStatistics) -> Self {
95        Self {
96            vnode_user_key_ranges: info
97                .vnode_user_key_ranges
98                .iter()
99                .map(|(&vnode, range)| {
100                    let min_key = UserKey::decode(&range.min_key).copy_into();
101                    let max_key = UserKey::decode(&range.max_key).copy_into();
102
103                    // assert shared same vnode and table-id
104                    assert_eq!(min_key.table_id, max_key.table_id);
105                    assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
106
107                    (VirtualNode::from_index(vnode as usize), (min_key, max_key))
108                })
109                .collect(),
110        }
111    }
112}
113
114impl From<PbVnodeStatistics> for VnodeStatistics {
115    fn from(info: PbVnodeStatistics) -> Self {
116        (&info).into()
117    }
118}
119
120impl From<&VnodeStatistics> for PbVnodeStatistics {
121    fn from(info: &VnodeStatistics) -> Self {
122        Self {
123            vnode_user_key_ranges: info
124                .vnode_user_key_ranges
125                .iter()
126                .map(|(vnode, (min_key, max_key))| {
127                    (
128                        vnode.to_index() as u32,
129                        PbVnodeUserKeyRange {
130                            min_key: min_key.encode(),
131                            max_key: max_key.encode(),
132                        },
133                    )
134                })
135                .collect(),
136        }
137    }
138}
139
140impl From<VnodeStatistics> for PbVnodeStatistics {
141    fn from(info: VnodeStatistics) -> Self {
142        (&info).into()
143    }
144}
145
146impl From<PbSstableInfo> for SstableInfoInner {
147    fn from(pb_sstable_info: PbSstableInfo) -> Self {
148        assert!(pb_sstable_info.table_ids.is_sorted());
149        Self {
150            object_id: pb_sstable_info.object_id,
151            sst_id: pb_sstable_info.sst_id,
152            key_range: {
153                // Due to the stripped key range, the key range may be `None`
154                if let Some(pb_keyrange) = pb_sstable_info.key_range {
155                    KeyRange {
156                        left: pb_keyrange.left.into(),
157                        right: pb_keyrange.right.into(),
158                        right_exclusive: pb_keyrange.right_exclusive,
159                    }
160                } else {
161                    KeyRange::inf()
162                }
163            },
164            file_size: pb_sstable_info.file_size,
165            table_ids: pb_sstable_info.table_ids,
166            meta_offset: pb_sstable_info.meta_offset,
167            stale_key_count: pb_sstable_info.stale_key_count,
168            total_key_count: pb_sstable_info.total_key_count,
169            min_epoch: pb_sstable_info.min_epoch,
170            max_epoch: pb_sstable_info.max_epoch,
171            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
172            range_tombstone_count: pb_sstable_info.range_tombstone_count,
173            bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
174                .unwrap(),
175            filter_type: PbSstableFilterType::try_from(pb_sstable_info.filter_type)
176                .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
177            sst_size: if pb_sstable_info.sst_size == 0 {
178                pb_sstable_info.file_size
179            } else {
180                pb_sstable_info.sst_size
181            },
182            vnode_statistics: pb_sstable_info
183                .vnode_statistics
184                .as_ref()
185                .map(VnodeStatistics::from),
186        }
187    }
188}
189
190impl From<&PbSstableInfo> for SstableInfoInner {
191    fn from(pb_sstable_info: &PbSstableInfo) -> Self {
192        assert!(pb_sstable_info.table_ids.is_sorted());
193        Self {
194            object_id: pb_sstable_info.object_id,
195            sst_id: pb_sstable_info.sst_id,
196            key_range: {
197                if let Some(pb_keyrange) = &pb_sstable_info.key_range {
198                    KeyRange {
199                        left: pb_keyrange.left.clone().into(),
200                        right: pb_keyrange.right.clone().into(),
201                        right_exclusive: pb_keyrange.right_exclusive,
202                    }
203                } else {
204                    KeyRange::inf()
205                }
206            },
207            file_size: pb_sstable_info.file_size,
208            table_ids: pb_sstable_info.table_ids.clone(),
209            meta_offset: pb_sstable_info.meta_offset,
210            stale_key_count: pb_sstable_info.stale_key_count,
211            total_key_count: pb_sstable_info.total_key_count,
212            min_epoch: pb_sstable_info.min_epoch,
213            max_epoch: pb_sstable_info.max_epoch,
214            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
215            range_tombstone_count: pb_sstable_info.range_tombstone_count,
216            bloom_filter_kind: PbBloomFilterType::try_from(pb_sstable_info.bloom_filter_kind)
217                .unwrap(),
218            filter_type: PbSstableFilterType::try_from(pb_sstable_info.filter_type)
219                .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
220            sst_size: if pb_sstable_info.sst_size == 0 {
221                pb_sstable_info.file_size
222            } else {
223                pb_sstable_info.sst_size
224            },
225            vnode_statistics: pb_sstable_info
226                .vnode_statistics
227                .as_ref()
228                .map(VnodeStatistics::from),
229        }
230    }
231}
232
233impl From<SstableInfoInner> for PbSstableInfo {
234    fn from(sstable_info: SstableInfoInner) -> Self {
235        assert!(sstable_info.table_ids.is_sorted());
236        PbSstableInfo {
237            object_id: sstable_info.object_id,
238            sst_id: sstable_info.sst_id,
239            key_range: {
240                let keyrange = sstable_info.key_range;
241                if keyrange.inf_key_range() {
242                    // For empty key range, we don't need to encode it
243                    // Timetravel will use the default key range to stripped the PbSstableInfo
244                    // Note: If new fields are added, using Default to implement stripped may not work, resulting in an increase in encode size.
245                    None
246                } else {
247                    let pb_key_range = PbKeyRange {
248                        left: keyrange.left.into(),
249                        right: keyrange.right.into(),
250                        right_exclusive: keyrange.right_exclusive,
251                    };
252                    Some(pb_key_range)
253                }
254            },
255
256            file_size: sstable_info.file_size,
257            table_ids: sstable_info.table_ids,
258            meta_offset: sstable_info.meta_offset,
259            stale_key_count: sstable_info.stale_key_count,
260            total_key_count: sstable_info.total_key_count,
261            min_epoch: sstable_info.min_epoch,
262            max_epoch: sstable_info.max_epoch,
263            uncompressed_file_size: sstable_info.uncompressed_file_size,
264            range_tombstone_count: sstable_info.range_tombstone_count,
265            bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
266            filter_type: sstable_info.filter_type.into(),
267            sst_size: sstable_info.sst_size,
268            vnode_statistics: sstable_info
269                .vnode_statistics
270                .as_ref()
271                .map(PbVnodeStatistics::from),
272        }
273    }
274}
275
276impl From<&SstableInfoInner> for PbSstableInfo {
277    fn from(sstable_info: &SstableInfoInner) -> Self {
278        assert!(sstable_info.table_ids.is_sorted());
279        PbSstableInfo {
280            object_id: sstable_info.object_id,
281            sst_id: sstable_info.sst_id,
282            key_range: {
283                let keyrange = &sstable_info.key_range;
284                if keyrange.inf_key_range() {
285                    None
286                } else {
287                    let pb_key_range = PbKeyRange {
288                        left: keyrange.left.to_vec(),
289                        right: keyrange.right.to_vec(),
290                        right_exclusive: keyrange.right_exclusive,
291                    };
292                    Some(pb_key_range)
293                }
294            },
295
296            file_size: sstable_info.file_size,
297            table_ids: sstable_info.table_ids.clone(),
298            meta_offset: sstable_info.meta_offset,
299            stale_key_count: sstable_info.stale_key_count,
300            total_key_count: sstable_info.total_key_count,
301            min_epoch: sstable_info.min_epoch,
302            max_epoch: sstable_info.max_epoch,
303            uncompressed_file_size: sstable_info.uncompressed_file_size,
304            range_tombstone_count: sstable_info.range_tombstone_count,
305            bloom_filter_kind: sstable_info.bloom_filter_kind.into(),
306            filter_type: sstable_info.filter_type.into(),
307            sst_size: sstable_info.sst_size,
308            vnode_statistics: sstable_info
309                .vnode_statistics
310                .as_ref()
311                .map(PbVnodeStatistics::from),
312        }
313    }
314}
315
316impl VnodeStatistics {
317    pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
318        Self {
319            vnode_user_key_ranges,
320        }
321    }
322
323    /// Returns vnode user key range (`min_user_key`, `max_user_key`) if available.
324    pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
325        self.vnode_user_key_ranges.get(&vnode)
326    }
327
328    #[cfg(any(test, feature = "test"))]
329    pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
330        &self.vnode_user_key_ranges
331    }
332}
333
334impl SstableInfo {
335    pub fn remove_key_range(&mut self) {
336        let mut sst = self.get_inner();
337        sst.key_range = KeyRange::default();
338        *self = sst.into()
339    }
340
341    pub fn filter_type_compatible_with(&self, filter_type: PbSstableFilterType) -> bool {
342        match self.filter_type {
343            PbSstableFilterType::SstableFilterUnspecified => {
344                filter_type == PbSstableFilterType::SstableFilterXor16
345            }
346            ty => ty == filter_type,
347        }
348    }
349}
350
351impl SstableIdReader for SstableInfoInner {
352    fn sst_id(&self) -> HummockSstableId {
353        self.sst_id
354    }
355}
356
357impl ObjectIdReader for SstableInfoInner {
358    fn object_id(&self) -> HummockSstableObjectId {
359        self.object_id
360    }
361}
362
363#[derive(Debug, PartialEq, Clone)]
364#[cfg_attr(any(test, feature = "test"), derive(Default))]
365pub struct SstableInfo(Arc<SstableInfoInner>);
366
367impl From<&PbSstableInfo> for SstableInfo {
368    fn from(s: &PbSstableInfo) -> Self {
369        SstableInfo(SstableInfoInner::from(s).into())
370    }
371}
372
373impl From<PbSstableInfo> for SstableInfo {
374    fn from(s: PbSstableInfo) -> Self {
375        SstableInfo(SstableInfoInner::from(s).into())
376    }
377}
378
379impl From<SstableInfo> for PbSstableInfo {
380    fn from(s: SstableInfo) -> Self {
381        (&s).into()
382    }
383}
384
385impl From<SstableInfoInner> for SstableInfo {
386    fn from(s: SstableInfoInner) -> Self {
387        Self(s.into())
388    }
389}
390
391impl From<&SstableInfo> for PbSstableInfo {
392    fn from(s: &SstableInfo) -> Self {
393        s.0.as_ref().into()
394    }
395}
396
397impl Deref for SstableInfo {
398    type Target = SstableInfoInner;
399
400    fn deref(&self) -> &Self::Target {
401        &self.0
402    }
403}
404
405impl SstableInfo {
406    pub fn get_inner(&self) -> SstableInfoInner {
407        (*self.0).clone()
408    }
409
410    pub fn set_inner(&mut self, inner: SstableInfoInner) {
411        self.0 = Arc::new(inner);
412    }
413}
414
415impl SstableIdReader for SstableInfo {
416    fn sst_id(&self) -> HummockSstableId {
417        self.sst_id
418    }
419}
420
421impl ObjectIdReader for SstableInfo {
422    fn object_id(&self) -> HummockSstableObjectId {
423        self.object_id
424    }
425}