risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32    BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use super::{HummockError, HummockResult};
58use crate::hummock::CachePolicy;
59use crate::store::ReadOptions;
60
61const MAGIC: u32 = 0x5785ab73;
62const OLD_VERSION: u32 = 1;
63const VERSION: u32 = 2;
64
65/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
66/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
67/// can be transformed into events below:
68/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
69/// -epoch3> }`
70/// Then we can get monotonic events (they are in order by user key) as below:
71/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
72/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
73/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
74/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
75/// next event key wmk2 (7) (not inclusive).
76/// If there is no range deletes between current event key and next event key, `new_epoch` will be
77/// `HummockEpoch::MAX`.
78#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
79pub struct MonotonicDeleteEvent {
80    pub event_key:
81        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
82    pub new_epoch: HummockEpoch,
83}
84
85impl MonotonicDeleteEvent {
86    pub fn encode(&self, mut buf: impl BufMut) {
87        self.event_key
88            .left_user_key
89            .encode_length_prefixed(&mut buf);
90        buf.put_u8(if self.event_key.is_exclude_left_key {
91            1
92        } else {
93            0
94        });
95        buf.put_u64_le(self.new_epoch);
96    }
97
98    pub fn decode(buf: &mut &[u8]) -> Self {
99        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
100        let user_key = UserKey::decode_length_prefixed(buf);
101        let exclude_left_key_flag = buf.get_u8();
102        let is_exclude_left_key = match exclude_left_key_flag {
103            0 => false,
104            1 => true,
105            _ => panic!("exclusive flag should be either 0 or 1"),
106        };
107        let new_epoch = buf.get_u64_le();
108        Self {
109            event_key: PointRange {
110                left_user_key: user_key,
111                is_exclude_left_key,
112            },
113            new_epoch,
114        }
115    }
116}
117
118#[derive(Serialize, Deserialize)]
119struct SerdeSstable {
120    id: HummockSstableObjectId,
121    meta: SstableMeta,
122}
123
124impl From<SerdeSstable> for Sstable {
125    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
126        // set skip_bloom_filter_in_serde to false because the behavior
127        // is determined by the serializer
128        Sstable::new(id, meta, false)
129    }
130}
131
132/// [`Sstable`] is a handle for accessing SST.
133#[derive(Clone, Deserialize)]
134#[serde(from = "SerdeSstable")]
135pub struct Sstable {
136    pub id: HummockSstableObjectId,
137    pub meta: SstableMeta,
138    #[serde(skip)]
139    pub filter_reader: XorFilterReader,
140    /// sst serde happens when a sst meta is written to meta disk cache.
141    /// excluding bloom filter from serde can reduce the meta disk cache entry size
142    /// and reduce the disk io throughput at the cost of making the bloom filter useless
143    #[serde(skip)]
144    skip_bloom_filter_in_serde: bool,
145}
146
147impl Serialize for Sstable {
148    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
149    where
150        S: serde::Serializer,
151    {
152        let mut serde_sstable = SerdeSstable {
153            id: self.id,
154            meta: self.meta.clone(),
155        };
156        if !self.skip_bloom_filter_in_serde {
157            serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
158        }
159        serde_sstable.serialize(serializer)
160    }
161}
162
163impl Debug for Sstable {
164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("Sstable")
166            .field("id", &self.id)
167            .field("meta", &self.meta)
168            .finish()
169    }
170}
171
172impl Sstable {
173    pub fn new(
174        id: HummockSstableObjectId,
175        mut meta: SstableMeta,
176        skip_bloom_filter_in_serde: bool,
177    ) -> Self {
178        let filter_data = std::mem::take(&mut meta.bloom_filter);
179        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
180        Self {
181            id,
182            meta,
183            filter_reader,
184            skip_bloom_filter_in_serde,
185        }
186    }
187
188    #[inline(always)]
189    pub fn has_bloom_filter(&self) -> bool {
190        !self.filter_reader.is_empty()
191    }
192
193    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
194        let block_meta = &self.meta.block_metas[block_index];
195        let range =
196            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
197        let uncompressed_capacity = block_meta.uncompressed_size as usize;
198        (range, uncompressed_capacity)
199    }
200
201    #[inline(always)]
202    pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
203        let dist_key_hash = xxh32::xxh32(dist_key, 0);
204        // congyi adds this because he aims to dedup keys in different tables
205        table_id.bitxor(dist_key_hash)
206    }
207
208    #[inline(always)]
209    pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
210        let dist_key_hash = xxh64::xxh64(dist_key, 0);
211        // congyi adds this because he aims to dedup keys in different tables
212        (table_id as u64).bitxor(dist_key_hash)
213    }
214
215    #[inline(always)]
216    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
217        self.filter_reader.may_match(user_key_range, hash)
218    }
219
220    #[inline(always)]
221    pub fn block_count(&self) -> usize {
222        self.meta.block_metas.len()
223    }
224
225    #[inline(always)]
226    pub fn estimate_size(&self) -> usize {
227        8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size()
228    }
229}
230
231#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
232pub struct BlockMeta {
233    pub smallest_key: Vec<u8>,
234    pub offset: u32,
235    pub len: u32,
236    pub uncompressed_size: u32,
237    pub total_key_count: u32,
238    pub stale_key_count: u32,
239}
240
241impl BlockMeta {
242    /// Format:
243    ///
244    /// ```plain
245    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
246    /// ```
247    pub fn encode(&self, mut buf: impl BufMut) {
248        buf.put_u32_le(self.offset);
249        buf.put_u32_le(self.len);
250        buf.put_u32_le(self.uncompressed_size);
251        buf.put_u32_le(self.total_key_count);
252        buf.put_u32_le(self.stale_key_count);
253        put_length_prefixed_slice(buf, &self.smallest_key);
254    }
255
256    pub fn decode(buf: &mut &[u8]) -> Self {
257        let offset = buf.get_u32_le();
258        let len = buf.get_u32_le();
259        let uncompressed_size = buf.get_u32_le();
260
261        let total_key_count = buf.get_u32_le();
262        let stale_key_count = buf.get_u32_le();
263        let smallest_key = get_length_prefixed_slice(buf);
264        Self {
265            smallest_key,
266            offset,
267            len,
268            uncompressed_size,
269            total_key_count,
270            stale_key_count,
271        }
272    }
273
274    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
275        let offset = buf.get_u32_le();
276        let len = buf.get_u32_le();
277        let uncompressed_size = buf.get_u32_le();
278        let total_key_count = 0;
279        let stale_key_count = 0;
280        let smallest_key = get_length_prefixed_slice(buf);
281        Self {
282            smallest_key,
283            offset,
284            len,
285            uncompressed_size,
286            total_key_count,
287            stale_key_count,
288        }
289    }
290
291    #[inline]
292    pub fn encoded_size(&self) -> usize {
293        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
294    }
295
296    pub fn table_id(&self) -> TableId {
297        FullKey::decode(&self.smallest_key).user_key.table_id
298    }
299}
300
301#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
302pub struct SstableMeta {
303    pub block_metas: Vec<BlockMeta>,
304    pub bloom_filter: Vec<u8>,
305    pub estimated_size: u32,
306    pub key_count: u32,
307    pub smallest_key: Vec<u8>,
308    pub largest_key: Vec<u8>,
309    pub meta_offset: u64,
310    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
311    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
312    /// can be transformed into events below:
313    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
314    /// -epoch3> }`
315    /// Then we can get monotonic events (they are in order by user key) as below:
316    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
317    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
318    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
319    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
320    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
321    /// If there is no range deletes between current event key and next event key, `new_epoch` will
322    /// be `HummockEpoch::MAX`.
323    #[deprecated]
324    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
325    /// Format version, for further compatibility.
326    pub version: u32,
327}
328
329impl SstableMeta {
330    /// Format:
331    ///
332    /// ```plain
333    /// | N (4B) |
334    /// | block meta 0 | ... | block meta N-1 |
335    /// | bloom filter len (4B) | bloom filter |
336    /// | estimated size (4B) | key count (4B) |
337    /// | smallest key len (4B) | smallest key |
338    /// | largest key len (4B) | largest key |
339    /// | K (4B) |
340    /// | tombstone-event 0 | ... | tombstone-event K-1 |
341    /// | file offset of this meta block (8B) |
342    /// | checksum (8B) | version (4B) | magic (4B) |
343    /// ```
344    pub fn encode_to_bytes(&self) -> Vec<u8> {
345        let encoded_size = self.encoded_size();
346        let mut buf = Vec::with_capacity(encoded_size);
347        self.encode_to(&mut buf);
348        buf
349    }
350
351    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
352        let start = buf.as_ref().len();
353
354        buf.put_u32_le(
355            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
356                let tmp_full_key = FullKey::decode(&self.smallest_key);
357                panic!(
358                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
359                    self.block_metas.len(),
360                    tmp_full_key.user_key.table_id,
361                )
362            }),
363        );
364        for block_meta in &self.block_metas {
365            block_meta.encode(&mut buf);
366        }
367        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
368        buf.put_u32_le(self.estimated_size);
369        buf.put_u32_le(self.key_count);
370        put_length_prefixed_slice(&mut buf, &self.smallest_key);
371        put_length_prefixed_slice(&mut buf, &self.largest_key);
372        #[expect(deprecated)]
373        buf.put_u32_le(
374            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
375                let tmp_full_key = FullKey::decode(&self.smallest_key);
376                panic!(
377                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
378                    self.monotonic_tombstone_events.len(),
379                    tmp_full_key.user_key.table_id,
380                )
381            }),
382        );
383        #[expect(deprecated)]
384        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
385            monotonic_tombstone_event.encode(&mut buf);
386        }
387        buf.put_u64_le(self.meta_offset);
388
389        let end = buf.as_ref().len();
390
391        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
392        buf.put_u64_le(checksum);
393        buf.put_u32_le(VERSION);
394        buf.put_u32_le(MAGIC);
395    }
396
397    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
398        let mut cursor = buf.len();
399
400        cursor -= 4;
401        let magic = (&buf[cursor..]).get_u32_le();
402        if magic != MAGIC {
403            return Err(HummockError::magic_mismatch(MAGIC, magic));
404        }
405
406        cursor -= 4;
407        let version = (&buf[cursor..cursor + 4]).get_u32_le();
408        if version != VERSION && version != OLD_VERSION {
409            return Err(HummockError::invalid_format_version(version));
410        }
411
412        cursor -= 8;
413        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
414        let buf = &mut &buf[..cursor];
415        xxhash64_verify(buf, checksum)?;
416
417        let block_meta_count = buf.get_u32_le() as usize;
418        let mut block_metas = Vec::with_capacity(block_meta_count);
419        if version == OLD_VERSION {
420            for _ in 0..block_meta_count {
421                block_metas.push(BlockMeta::decode_from_v1(buf));
422            }
423        } else {
424            for _ in 0..block_meta_count {
425                block_metas.push(BlockMeta::decode(buf));
426            }
427        }
428
429        let bloom_filter = get_length_prefixed_slice(buf);
430        let estimated_size = buf.get_u32_le();
431        let key_count = buf.get_u32_le();
432        let smallest_key = get_length_prefixed_slice(buf);
433        let largest_key = get_length_prefixed_slice(buf);
434        let tomb_event_count = buf.get_u32_le() as usize;
435        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
436        for _ in 0..tomb_event_count {
437            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
438            monotonic_tombstone_events.push(monotonic_tombstone_event);
439        }
440        let meta_offset = buf.get_u64_le();
441
442        if !monotonic_tombstone_events.is_empty() {
443            warn!(
444                count = monotonic_tombstone_events.len(),
445                tables = ?monotonic_tombstone_events
446                    .iter()
447                    .map(|event| event.event_key.left_user_key.table_id)
448                    .collect::<HashSet<_>>(),
449                "read non-empty range tombstones");
450        }
451
452        #[expect(deprecated)]
453        Ok(Self {
454            block_metas,
455            bloom_filter,
456            estimated_size,
457            key_count,
458            smallest_key,
459            largest_key,
460            meta_offset,
461            monotonic_tombstone_events,
462            version,
463        })
464    }
465
466    #[inline]
467    pub fn encoded_size(&self) -> usize {
468        4 // block meta count
469            + self
470            .block_metas
471            .iter()
472            .map(|block_meta| block_meta.encoded_size())
473            .sum::<usize>()
474            + 4 // monotonic tombstone events len
475            + 4 // bloom filter len
476            + self.bloom_filter.len()
477            + 4 // estimated size
478            + 4 // key count
479            + 4 // key len
480            + self.smallest_key.len()
481            + 4 // key len
482            + self.largest_key.len()
483            + 8 // footer
484            + 8 // checksum
485            + 4 // version
486            + 4 // magic
487    }
488}
489
490#[derive(Default)]
491pub struct SstableIteratorReadOptions {
492    pub cache_policy: CachePolicy,
493    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
494    pub max_preload_retry_times: usize,
495    pub prefetch_for_large_query: bool,
496}
497
498impl SstableIteratorReadOptions {
499    pub fn from_read_options(read_options: &ReadOptions) -> Self {
500        Self {
501            cache_policy: read_options.cache_policy,
502            must_iterated_end_user_key: None,
503            max_preload_retry_times: 0,
504            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
505        }
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use crate::hummock::HummockValue;
513    use crate::hummock::iterator::test_utils::{
514        default_builder_opt_for_test, iterator_test_key_of,
515    };
516    use crate::hummock::test_utils::gen_test_sstable_data;
517
518    #[test]
519    fn test_sstable_meta_enc_dec() {
520        #[expect(deprecated)]
521        let meta = SstableMeta {
522            block_metas: vec![
523                BlockMeta {
524                    smallest_key: b"0-smallest-key".to_vec(),
525                    len: 100,
526                    ..Default::default()
527                },
528                BlockMeta {
529                    smallest_key: b"5-some-key".to_vec(),
530                    offset: 100,
531                    len: 100,
532                    ..Default::default()
533                },
534            ],
535            bloom_filter: b"0123456789".to_vec(),
536            estimated_size: 123,
537            key_count: 123,
538            smallest_key: b"0-smallest-key".to_vec(),
539            largest_key: b"9-largest-key".to_vec(),
540            meta_offset: 123,
541            monotonic_tombstone_events: vec![],
542            version: VERSION,
543        };
544        let sz = meta.encoded_size();
545        let buf = meta.encode_to_bytes();
546        assert_eq!(sz, buf.len());
547        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
548        assert_eq!(decoded_meta, meta);
549
550        println!("buf: {}", buf.len());
551    }
552
553    #[tokio::test]
554    async fn test_sstable_serde() {
555        let (_, meta) = gen_test_sstable_data(
556            default_builder_opt_for_test(),
557            (0..100).clone().map(|x| {
558                (
559                    iterator_test_key_of(x),
560                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
561                )
562            }),
563        )
564        .await;
565
566        // skip sst serde
567        let sstable = Sstable::new(42.into(), meta.clone(), true);
568
569        let buffer = bincode::serialize(&sstable).unwrap();
570
571        let s: Sstable = bincode::deserialize(&buffer).unwrap();
572
573        assert_eq!(s.id, sstable.id);
574        assert_eq!(s.meta, sstable.meta);
575        assert!(!sstable.filter_reader.is_empty());
576        // the table filter reader is empty because bloom filter is skipped in serde
577        assert!(s.filter_reader.is_empty());
578
579        // enable sst serde
580        let sstable = Sstable::new(42.into(), meta, false);
581
582        let buffer = bincode::serialize(&sstable).unwrap();
583
584        let s: Sstable = bincode::deserialize(&buffer).unwrap();
585
586        assert_eq!(s.id, sstable.id);
587        assert_eq!(s.meta, sstable.meta);
588        assert_eq!(
589            s.filter_reader.encode_to_bytes(),
590            sstable.filter_reader.encode_to_bytes()
591        );
592    }
593}