Skip to main content

risingwave_hummock_sdk/
sstable_info.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24    PbBloomFilterType, PbKeyRange, PbSstableFilterLayout, PbSstableFilterType, PbSstableInfo,
25    PbVnodeStatistics, PbVnodeUserKeyRange,
26};
27
28use crate::key::UserKey;
29use crate::key_range::KeyRange;
30use crate::version::{ObjectIdReader, SstableIdReader};
31use crate::{HummockSstableId, HummockSstableObjectId};
32
33pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
34
35#[derive(Debug, PartialEq, Clone, Default)]
36pub struct VnodeStatistics {
37    /// Per-vnode user key ranges as closed intervals: [`min_user_key`, `max_user_key`].
38    vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
39}
40
41#[derive(Debug, PartialEq, Clone)]
42pub struct SstableInfoInner {
43    pub object_id: HummockSstableObjectId,
44    pub sst_id: HummockSstableId,
45    pub key_range: KeyRange,
46    pub file_size: u64,
47    pub table_ids: Vec<TableId>,
48    pub meta_offset: u64,
49    pub stale_key_count: u64,
50    pub total_key_count: u64,
51    pub min_epoch: u64,
52    pub max_epoch: u64,
53    pub uncompressed_file_size: u64,
54    pub range_tombstone_count: u64,
55    pub filter_type: PbSstableFilterType,
56    pub filter_layout: PbSstableFilterLayout,
57    pub sst_size: u64,
58    pub vnode_statistics: Option<VnodeStatistics>,
59}
60
61impl SstableInfoInner {
62    pub fn estimated_encode_len(&self) -> usize {
63        let mut basic = size_of::<u64>() // object_id
64            + size_of::<u64>() // sstable_id
65            + size_of::<u64>() // file_size
66            + self.table_ids.len() * size_of::<u32>() // table_ids
67            + size_of::<u64>() // meta_offset
68            + size_of::<u64>() // stale_key_count
69            + size_of::<u64>() // total_key_count
70            + size_of::<u64>() // min_epoch
71            + size_of::<u64>() // max_epoch
72            + size_of::<u64>() // uncompressed_file_size
73            + size_of::<u64>() // range_tombstone_count
74            + size_of::<u64>(); // sst_size
75        basic += size_of::<u32>(); // filter_type
76        if self.filter_layout != PbSstableFilterLayout::Unspecified {
77            basic += size_of::<u32>(); // filter_layout
78        }
79        basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
80        if let Some(vnode_statistics) = &self.vnode_statistics {
81            for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
82                basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
83            }
84        }
85
86        basic
87    }
88
89    pub fn to_protobuf(&self) -> PbSstableInfo {
90        self.into()
91    }
92}
93
94#[cfg(any(test, feature = "test"))]
95impl Default for SstableInfoInner {
96    fn default() -> Self {
97        PbSstableInfo::default().into()
98    }
99}
100
101fn filter_metadata_from_pb(
102    filter_type: Option<i32>,
103    filter_layout: Option<i32>,
104    bloom_filter_kind: i32,
105) -> (PbSstableFilterType, PbSstableFilterLayout) {
106    let bloom_filter_kind = PbBloomFilterType::try_from(bloom_filter_kind)
107        .expect("invalid legacy bloom_filter_kind in SST info");
108
109    // Exactly one metadata format is active for each SST: legacy SSTs use
110    // `bloom_filter_kind`, while new SSTs use `filter_type` plus optional `filter_layout`.
111    if let Some(filter_type) = filter_type {
112        assert_eq!(
113            bloom_filter_kind,
114            PbBloomFilterType::BloomFilterUnspecified,
115            "new SST filter metadata must not set legacy bloom_filter_kind"
116        );
117        let filter_type =
118            PbSstableFilterType::try_from(filter_type).expect("invalid filter_type in SST info");
119        assert_ne!(
120            filter_type,
121            PbSstableFilterType::SstableFilterUnspecified,
122            "new SST filter metadata must use a resolved filter_type"
123        );
124
125        let filter_layout = filter_layout
126            .map(|filter_layout| {
127                PbSstableFilterLayout::try_from(filter_layout)
128                    .expect("invalid filter_layout in SST info")
129            })
130            .unwrap_or(PbSstableFilterLayout::Unspecified);
131        if filter_type == PbSstableFilterType::SstableFilterNone {
132            assert_eq!(
133                filter_layout,
134                PbSstableFilterLayout::Unspecified,
135                "SST filter metadata with no filter must not set filter_layout"
136            );
137        }
138
139        return (filter_type, filter_layout);
140    }
141
142    assert!(
143        filter_layout.is_none(),
144        "legacy SST filter metadata must not set filter_layout"
145    );
146    match bloom_filter_kind {
147        PbBloomFilterType::BloomFilterUnspecified => (
148            PbSstableFilterType::SstableFilterNone,
149            PbSstableFilterLayout::Unspecified,
150        ),
151        PbBloomFilterType::Sstable => (
152            PbSstableFilterType::SstableFilterXor16,
153            PbSstableFilterLayout::Plain,
154        ),
155        PbBloomFilterType::Blocked => (
156            PbSstableFilterType::SstableFilterXor16,
157            PbSstableFilterLayout::Blocked,
158        ),
159    }
160}
161
162fn assert_resolved_filter_metadata(
163    filter_type: PbSstableFilterType,
164    filter_layout: PbSstableFilterLayout,
165) {
166    assert_ne!(
167        filter_type,
168        PbSstableFilterType::SstableFilterUnspecified,
169        "SST filter metadata must use a resolved filter_type"
170    );
171    if filter_type == PbSstableFilterType::SstableFilterNone {
172        assert_eq!(
173            filter_layout,
174            PbSstableFilterLayout::Unspecified,
175            "SST filter metadata with no filter must not set filter_layout"
176        );
177    }
178}
179
180impl From<&PbVnodeStatistics> for VnodeStatistics {
181    fn from(info: &PbVnodeStatistics) -> Self {
182        Self {
183            vnode_user_key_ranges: info
184                .vnode_user_key_ranges
185                .iter()
186                .map(|(&vnode, range)| {
187                    let min_key = UserKey::decode(&range.min_key).copy_into();
188                    let max_key = UserKey::decode(&range.max_key).copy_into();
189
190                    // assert shared same vnode and table-id
191                    assert_eq!(min_key.table_id, max_key.table_id);
192                    assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
193
194                    (VirtualNode::from_index(vnode as usize), (min_key, max_key))
195                })
196                .collect(),
197        }
198    }
199}
200
201impl From<PbVnodeStatistics> for VnodeStatistics {
202    fn from(info: PbVnodeStatistics) -> Self {
203        (&info).into()
204    }
205}
206
207impl From<&VnodeStatistics> for PbVnodeStatistics {
208    fn from(info: &VnodeStatistics) -> Self {
209        Self {
210            vnode_user_key_ranges: info
211                .vnode_user_key_ranges
212                .iter()
213                .map(|(vnode, (min_key, max_key))| {
214                    (
215                        vnode.to_index() as u32,
216                        PbVnodeUserKeyRange {
217                            min_key: min_key.encode(),
218                            max_key: max_key.encode(),
219                        },
220                    )
221                })
222                .collect(),
223        }
224    }
225}
226
227impl From<VnodeStatistics> for PbVnodeStatistics {
228    fn from(info: VnodeStatistics) -> Self {
229        (&info).into()
230    }
231}
232
233impl From<PbSstableInfo> for SstableInfoInner {
234    #[expect(
235        deprecated,
236        reason = "read legacy SST filter metadata for compatibility"
237    )]
238    fn from(pb_sstable_info: PbSstableInfo) -> Self {
239        assert!(pb_sstable_info.table_ids.is_sorted());
240        let (filter_type, filter_layout) = filter_metadata_from_pb(
241            pb_sstable_info.filter_type,
242            pb_sstable_info.filter_layout,
243            pb_sstable_info.bloom_filter_kind,
244        );
245        Self {
246            object_id: pb_sstable_info.object_id,
247            sst_id: pb_sstable_info.sst_id,
248            key_range: {
249                // Due to the stripped key range, the key range may be `None`
250                if let Some(pb_keyrange) = pb_sstable_info.key_range {
251                    KeyRange {
252                        left: pb_keyrange.left.into(),
253                        right: pb_keyrange.right.into(),
254                        right_exclusive: pb_keyrange.right_exclusive,
255                    }
256                } else {
257                    KeyRange::inf()
258                }
259            },
260            file_size: pb_sstable_info.file_size,
261            table_ids: pb_sstable_info.table_ids,
262            meta_offset: pb_sstable_info.meta_offset,
263            stale_key_count: pb_sstable_info.stale_key_count,
264            total_key_count: pb_sstable_info.total_key_count,
265            min_epoch: pb_sstable_info.min_epoch,
266            max_epoch: pb_sstable_info.max_epoch,
267            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
268            range_tombstone_count: pb_sstable_info.range_tombstone_count,
269            filter_type,
270            filter_layout,
271            sst_size: if pb_sstable_info.sst_size == 0 {
272                pb_sstable_info.file_size
273            } else {
274                pb_sstable_info.sst_size
275            },
276            vnode_statistics: pb_sstable_info
277                .vnode_statistics
278                .as_ref()
279                .map(VnodeStatistics::from),
280        }
281    }
282}
283
284impl From<&PbSstableInfo> for SstableInfoInner {
285    #[expect(
286        deprecated,
287        reason = "read legacy SST filter metadata for compatibility"
288    )]
289    fn from(pb_sstable_info: &PbSstableInfo) -> Self {
290        assert!(pb_sstable_info.table_ids.is_sorted());
291        let (filter_type, filter_layout) = filter_metadata_from_pb(
292            pb_sstable_info.filter_type,
293            pb_sstable_info.filter_layout,
294            pb_sstable_info.bloom_filter_kind,
295        );
296        Self {
297            object_id: pb_sstable_info.object_id,
298            sst_id: pb_sstable_info.sst_id,
299            key_range: {
300                if let Some(pb_keyrange) = &pb_sstable_info.key_range {
301                    KeyRange {
302                        left: pb_keyrange.left.clone().into(),
303                        right: pb_keyrange.right.clone().into(),
304                        right_exclusive: pb_keyrange.right_exclusive,
305                    }
306                } else {
307                    KeyRange::inf()
308                }
309            },
310            file_size: pb_sstable_info.file_size,
311            table_ids: pb_sstable_info.table_ids.clone(),
312            meta_offset: pb_sstable_info.meta_offset,
313            stale_key_count: pb_sstable_info.stale_key_count,
314            total_key_count: pb_sstable_info.total_key_count,
315            min_epoch: pb_sstable_info.min_epoch,
316            max_epoch: pb_sstable_info.max_epoch,
317            uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
318            range_tombstone_count: pb_sstable_info.range_tombstone_count,
319            filter_type,
320            filter_layout,
321            sst_size: if pb_sstable_info.sst_size == 0 {
322                pb_sstable_info.file_size
323            } else {
324                pb_sstable_info.sst_size
325            },
326            vnode_statistics: pb_sstable_info
327                .vnode_statistics
328                .as_ref()
329                .map(VnodeStatistics::from),
330        }
331    }
332}
333
334impl From<SstableInfoInner> for PbSstableInfo {
335    #[expect(deprecated, reason = "write legacy SST filter metadata as unset")]
336    fn from(sstable_info: SstableInfoInner) -> Self {
337        assert!(sstable_info.table_ids.is_sorted());
338        assert_resolved_filter_metadata(sstable_info.filter_type, sstable_info.filter_layout);
339        PbSstableInfo {
340            object_id: sstable_info.object_id,
341            sst_id: sstable_info.sst_id,
342            key_range: {
343                let keyrange = sstable_info.key_range;
344                if keyrange.inf_key_range() {
345                    // For empty key range, we don't need to encode it
346                    // Timetravel will use the default key range to stripped the PbSstableInfo
347                    // Note: If new fields are added, using Default to implement stripped may not work, resulting in an increase in encode size.
348                    None
349                } else {
350                    let pb_key_range = PbKeyRange {
351                        left: keyrange.left.into(),
352                        right: keyrange.right.into(),
353                        right_exclusive: keyrange.right_exclusive,
354                    };
355                    Some(pb_key_range)
356                }
357            },
358
359            file_size: sstable_info.file_size,
360            table_ids: sstable_info.table_ids,
361            meta_offset: sstable_info.meta_offset,
362            stale_key_count: sstable_info.stale_key_count,
363            total_key_count: sstable_info.total_key_count,
364            min_epoch: sstable_info.min_epoch,
365            max_epoch: sstable_info.max_epoch,
366            uncompressed_file_size: sstable_info.uncompressed_file_size,
367            range_tombstone_count: sstable_info.range_tombstone_count,
368            bloom_filter_kind: PbBloomFilterType::BloomFilterUnspecified as i32,
369            filter_type: Some(sstable_info.filter_type.into()),
370            filter_layout: (sstable_info.filter_layout != PbSstableFilterLayout::Unspecified)
371                .then_some(sstable_info.filter_layout.into()),
372            sst_size: sstable_info.sst_size,
373            vnode_statistics: sstable_info
374                .vnode_statistics
375                .as_ref()
376                .map(PbVnodeStatistics::from),
377        }
378    }
379}
380
381impl From<&SstableInfoInner> for PbSstableInfo {
382    #[expect(deprecated, reason = "write legacy SST filter metadata as unset")]
383    fn from(sstable_info: &SstableInfoInner) -> Self {
384        assert!(sstable_info.table_ids.is_sorted());
385        assert_resolved_filter_metadata(sstable_info.filter_type, sstable_info.filter_layout);
386        PbSstableInfo {
387            object_id: sstable_info.object_id,
388            sst_id: sstable_info.sst_id,
389            key_range: {
390                let keyrange = &sstable_info.key_range;
391                if keyrange.inf_key_range() {
392                    None
393                } else {
394                    let pb_key_range = PbKeyRange {
395                        left: keyrange.left.to_vec(),
396                        right: keyrange.right.to_vec(),
397                        right_exclusive: keyrange.right_exclusive,
398                    };
399                    Some(pb_key_range)
400                }
401            },
402
403            file_size: sstable_info.file_size,
404            table_ids: sstable_info.table_ids.clone(),
405            meta_offset: sstable_info.meta_offset,
406            stale_key_count: sstable_info.stale_key_count,
407            total_key_count: sstable_info.total_key_count,
408            min_epoch: sstable_info.min_epoch,
409            max_epoch: sstable_info.max_epoch,
410            uncompressed_file_size: sstable_info.uncompressed_file_size,
411            range_tombstone_count: sstable_info.range_tombstone_count,
412            bloom_filter_kind: PbBloomFilterType::BloomFilterUnspecified as i32,
413            filter_type: Some(sstable_info.filter_type.into()),
414            filter_layout: (sstable_info.filter_layout != PbSstableFilterLayout::Unspecified)
415                .then_some(sstable_info.filter_layout.into()),
416            sst_size: sstable_info.sst_size,
417            vnode_statistics: sstable_info
418                .vnode_statistics
419                .as_ref()
420                .map(PbVnodeStatistics::from),
421        }
422    }
423}
424
425impl VnodeStatistics {
426    pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
427        Self {
428            vnode_user_key_ranges,
429        }
430    }
431
432    /// Returns vnode user key range (`min_user_key`, `max_user_key`) if available.
433    pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
434        self.vnode_user_key_ranges.get(&vnode)
435    }
436
437    #[cfg(any(test, feature = "test"))]
438    pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
439        &self.vnode_user_key_ranges
440    }
441}
442
443impl SstableInfo {
444    pub fn remove_key_range(&mut self) {
445        let mut sst = self.get_inner();
446        sst.key_range = KeyRange::default();
447        *self = sst.into()
448    }
449}
450
451impl SstableIdReader for SstableInfoInner {
452    fn sst_id(&self) -> HummockSstableId {
453        self.sst_id
454    }
455}
456
457impl ObjectIdReader for SstableInfoInner {
458    fn object_id(&self) -> HummockSstableObjectId {
459        self.object_id
460    }
461}
462
463#[derive(Debug, PartialEq, Clone)]
464#[cfg_attr(any(test, feature = "test"), derive(Default))]
465pub struct SstableInfo(Arc<SstableInfoInner>);
466
467impl From<&PbSstableInfo> for SstableInfo {
468    fn from(s: &PbSstableInfo) -> Self {
469        SstableInfo(SstableInfoInner::from(s).into())
470    }
471}
472
473impl From<PbSstableInfo> for SstableInfo {
474    fn from(s: PbSstableInfo) -> Self {
475        SstableInfo(SstableInfoInner::from(s).into())
476    }
477}
478
479impl From<SstableInfo> for PbSstableInfo {
480    fn from(s: SstableInfo) -> Self {
481        (&s).into()
482    }
483}
484
485impl From<SstableInfoInner> for SstableInfo {
486    fn from(s: SstableInfoInner) -> Self {
487        Self(s.into())
488    }
489}
490
491impl From<&SstableInfo> for PbSstableInfo {
492    fn from(s: &SstableInfo) -> Self {
493        s.0.as_ref().into()
494    }
495}
496
497impl Deref for SstableInfo {
498    type Target = SstableInfoInner;
499
500    fn deref(&self) -> &Self::Target {
501        &self.0
502    }
503}
504
505impl SstableInfo {
506    pub fn get_inner(&self) -> SstableInfoInner {
507        (*self.0).clone()
508    }
509
510    pub fn set_inner(&mut self, inner: SstableInfoInner) {
511        self.0 = Arc::new(inner);
512    }
513}
514
515impl SstableIdReader for SstableInfo {
516    fn sst_id(&self) -> HummockSstableId {
517        self.sst_id
518    }
519}
520
521impl ObjectIdReader for SstableInfo {
522    fn object_id(&self) -> HummockSstableObjectId {
523        self.object_id
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    #[test]
532    #[expect(deprecated, reason = "test legacy SST filter metadata compatibility")]
533    fn test_filter_metadata_from_pb() {
534        let cases = [
535            (
536                "new filter metadata",
537                None,
538                Some(PbSstableFilterType::SstableFilterXor8),
539                Some(PbSstableFilterLayout::Plain),
540                PbSstableFilterType::SstableFilterXor8,
541                PbSstableFilterLayout::Plain,
542            ),
543            (
544                "legacy blocked bloom kind",
545                Some(PbBloomFilterType::Blocked),
546                None,
547                None,
548                PbSstableFilterType::SstableFilterXor16,
549                PbSstableFilterLayout::Blocked,
550            ),
551            (
552                "legacy plain bloom kind",
553                Some(PbBloomFilterType::Sstable),
554                None,
555                None,
556                PbSstableFilterType::SstableFilterXor16,
557                PbSstableFilterLayout::Plain,
558            ),
559            (
560                "explicit no filter",
561                None,
562                Some(PbSstableFilterType::SstableFilterNone),
563                None,
564                PbSstableFilterType::SstableFilterNone,
565                PbSstableFilterLayout::Unspecified,
566            ),
567        ];
568
569        for (
570            case_name,
571            bloom_filter_kind,
572            filter_type,
573            filter_layout,
574            expected_filter_type,
575            expected_filter_layout,
576        ) in cases
577        {
578            let sst_info = SstableInfoInner::from(PbSstableInfo {
579                bloom_filter_kind: bloom_filter_kind
580                    .unwrap_or(PbBloomFilterType::BloomFilterUnspecified)
581                    .into(),
582                filter_type: filter_type.map(Into::into),
583                filter_layout: filter_layout.map(Into::into),
584                ..Default::default()
585            });
586
587            assert_eq!(sst_info.filter_type, expected_filter_type, "{case_name}");
588            assert_eq!(
589                sst_info.filter_layout, expected_filter_layout,
590                "{case_name}"
591            );
592        }
593
594        let sst_info = SstableInfoInner {
595            filter_type: PbSstableFilterType::SstableFilterNone,
596            filter_layout: PbSstableFilterLayout::Unspecified,
597            ..Default::default()
598        };
599        let pb_sst_info = PbSstableInfo::from(sst_info);
600        assert_eq!(
601            pb_sst_info.bloom_filter_kind,
602            PbBloomFilterType::BloomFilterUnspecified as i32
603        );
604        assert_eq!(
605            pb_sst_info.filter_type,
606            Some(PbSstableFilterType::SstableFilterNone as i32)
607        );
608        assert_eq!(pb_sst_info.filter_layout, None);
609    }
610}