risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32    BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod sstable_object_id_manager;
51mod utils;
52
53pub use filter::FilterBuilder;
54pub use sstable_object_id_manager::*;
55pub use utils::CompressionAlgorithm;
56use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
57use xxhash_rust::{xxh32, xxh64};
58
59use self::utils::{xxhash64_checksum, xxhash64_verify};
60use super::{HummockError, HummockResult};
61use crate::hummock::CachePolicy;
62use crate::store::ReadOptions;
63
64const MAGIC: u32 = 0x5785ab73;
65const OLD_VERSION: u32 = 1;
66const VERSION: u32 = 2;
67
68/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
69/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
70/// can be transformed into events below:
71/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
72/// -epoch3> }`
73/// Then we can get monotonic events (they are in order by user key) as below:
74/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
75/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
76/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
77/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
78/// next event key wmk2 (7) (not inclusive).
79/// If there is no range deletes between current event key and next event key, `new_epoch` will be
80/// `HummockEpoch::MAX`.
81#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
82pub struct MonotonicDeleteEvent {
83    pub event_key:
84        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
85    pub new_epoch: HummockEpoch,
86}
87
88impl MonotonicDeleteEvent {
89    pub fn encode(&self, mut buf: impl BufMut) {
90        self.event_key
91            .left_user_key
92            .encode_length_prefixed(&mut buf);
93        buf.put_u8(if self.event_key.is_exclude_left_key {
94            1
95        } else {
96            0
97        });
98        buf.put_u64_le(self.new_epoch);
99    }
100
101    pub fn decode(buf: &mut &[u8]) -> Self {
102        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
103        let user_key = UserKey::decode_length_prefixed(buf);
104        let exclude_left_key_flag = buf.get_u8();
105        let is_exclude_left_key = match exclude_left_key_flag {
106            0 => false,
107            1 => true,
108            _ => panic!("exclusive flag should be either 0 or 1"),
109        };
110        let new_epoch = buf.get_u64_le();
111        Self {
112            event_key: PointRange {
113                left_user_key: user_key,
114                is_exclude_left_key,
115            },
116            new_epoch,
117        }
118    }
119}
120
121#[derive(Serialize, Deserialize)]
122struct SerdeSstable {
123    id: HummockSstableObjectId,
124    meta: SstableMeta,
125}
126
127impl From<SerdeSstable> for Sstable {
128    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
129        Sstable::new(id, meta)
130    }
131}
132
133/// [`Sstable`] is a handle for accessing SST.
134#[derive(Clone, Serialize, Deserialize)]
135#[serde(from = "SerdeSstable")]
136pub struct Sstable {
137    pub id: HummockSstableObjectId,
138    pub meta: SstableMeta,
139    #[serde(skip)]
140    pub filter_reader: XorFilterReader,
141}
142
143impl Debug for Sstable {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("Sstable")
146            .field("id", &self.id)
147            .field("meta", &self.meta)
148            .finish()
149    }
150}
151
152impl Sstable {
153    pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
154        let filter_data = std::mem::take(&mut meta.bloom_filter);
155        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
156        Self {
157            id,
158            meta,
159            filter_reader,
160        }
161    }
162
163    #[inline(always)]
164    pub fn has_bloom_filter(&self) -> bool {
165        !self.filter_reader.is_empty()
166    }
167
168    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
169        let block_meta = &self.meta.block_metas[block_index];
170        let range =
171            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
172        let uncompressed_capacity = block_meta.uncompressed_size as usize;
173        (range, uncompressed_capacity)
174    }
175
176    #[inline(always)]
177    pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
178        let dist_key_hash = xxh32::xxh32(dist_key, 0);
179        // congyi adds this because he aims to dedup keys in different tables
180        table_id.bitxor(dist_key_hash)
181    }
182
183    #[inline(always)]
184    pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
185        let dist_key_hash = xxh64::xxh64(dist_key, 0);
186        // congyi adds this because he aims to dedup keys in different tables
187        (table_id as u64).bitxor(dist_key_hash)
188    }
189
190    #[inline(always)]
191    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
192        self.filter_reader.may_match(user_key_range, hash)
193    }
194
195    #[inline(always)]
196    pub fn block_count(&self) -> usize {
197        self.meta.block_metas.len()
198    }
199
200    #[inline(always)]
201    pub fn estimate_size(&self) -> usize {
202        8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size()
203    }
204}
205
206#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
207pub struct BlockMeta {
208    pub smallest_key: Vec<u8>,
209    pub offset: u32,
210    pub len: u32,
211    pub uncompressed_size: u32,
212    pub total_key_count: u32,
213    pub stale_key_count: u32,
214}
215
216impl BlockMeta {
217    /// Format:
218    ///
219    /// ```plain
220    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
221    /// ```
222    pub fn encode(&self, mut buf: impl BufMut) {
223        buf.put_u32_le(self.offset);
224        buf.put_u32_le(self.len);
225        buf.put_u32_le(self.uncompressed_size);
226        buf.put_u32_le(self.total_key_count);
227        buf.put_u32_le(self.stale_key_count);
228        put_length_prefixed_slice(buf, &self.smallest_key);
229    }
230
231    pub fn decode(buf: &mut &[u8]) -> Self {
232        let offset = buf.get_u32_le();
233        let len = buf.get_u32_le();
234        let uncompressed_size = buf.get_u32_le();
235
236        let total_key_count = buf.get_u32_le();
237        let stale_key_count = buf.get_u32_le();
238        let smallest_key = get_length_prefixed_slice(buf);
239        Self {
240            smallest_key,
241            offset,
242            len,
243            uncompressed_size,
244            total_key_count,
245            stale_key_count,
246        }
247    }
248
249    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
250        let offset = buf.get_u32_le();
251        let len = buf.get_u32_le();
252        let uncompressed_size = buf.get_u32_le();
253        let total_key_count = 0;
254        let stale_key_count = 0;
255        let smallest_key = get_length_prefixed_slice(buf);
256        Self {
257            smallest_key,
258            offset,
259            len,
260            uncompressed_size,
261            total_key_count,
262            stale_key_count,
263        }
264    }
265
266    #[inline]
267    pub fn encoded_size(&self) -> usize {
268        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
269    }
270
271    pub fn table_id(&self) -> TableId {
272        FullKey::decode(&self.smallest_key).user_key.table_id
273    }
274}
275
276#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
277pub struct SstableMeta {
278    pub block_metas: Vec<BlockMeta>,
279    pub bloom_filter: Vec<u8>,
280    pub estimated_size: u32,
281    pub key_count: u32,
282    pub smallest_key: Vec<u8>,
283    pub largest_key: Vec<u8>,
284    pub meta_offset: u64,
285    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
286    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
287    /// can be transformed into events below:
288    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
289    /// -epoch3> }`
290    /// Then we can get monotonic events (they are in order by user key) as below:
291    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
292    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
293    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
294    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
295    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
296    /// If there is no range deletes between current event key and next event key, `new_epoch` will
297    /// be `HummockEpoch::MAX`.
298    #[deprecated]
299    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
300    /// Format version, for further compatibility.
301    pub version: u32,
302}
303
304impl SstableMeta {
305    /// Format:
306    ///
307    /// ```plain
308    /// | N (4B) |
309    /// | block meta 0 | ... | block meta N-1 |
310    /// | bloom filter len (4B) | bloom filter |
311    /// | estimated size (4B) | key count (4B) |
312    /// | smallest key len (4B) | smallest key |
313    /// | largest key len (4B) | largest key |
314    /// | K (4B) |
315    /// | tombstone-event 0 | ... | tombstone-event K-1 |
316    /// | file offset of this meta block (8B) |
317    /// | checksum (8B) | version (4B) | magic (4B) |
318    /// ```
319    pub fn encode_to_bytes(&self) -> Vec<u8> {
320        let encoded_size = self.encoded_size();
321        let mut buf = Vec::with_capacity(encoded_size);
322        self.encode_to(&mut buf);
323        buf
324    }
325
326    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
327        let start = buf.as_ref().len();
328
329        buf.put_u32_le(
330            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
331                let tmp_full_key = FullKey::decode(&self.smallest_key);
332                panic!(
333                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
334                    self.block_metas.len(),
335                    tmp_full_key.user_key.table_id,
336                )
337            }),
338        );
339        for block_meta in &self.block_metas {
340            block_meta.encode(&mut buf);
341        }
342        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
343        buf.put_u32_le(self.estimated_size);
344        buf.put_u32_le(self.key_count);
345        put_length_prefixed_slice(&mut buf, &self.smallest_key);
346        put_length_prefixed_slice(&mut buf, &self.largest_key);
347        #[expect(deprecated)]
348        buf.put_u32_le(
349            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
350                let tmp_full_key = FullKey::decode(&self.smallest_key);
351                panic!(
352                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
353                    self.monotonic_tombstone_events.len(),
354                    tmp_full_key.user_key.table_id,
355                )
356            }),
357        );
358        #[expect(deprecated)]
359        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
360            monotonic_tombstone_event.encode(&mut buf);
361        }
362        buf.put_u64_le(self.meta_offset);
363
364        let end = buf.as_ref().len();
365
366        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
367        buf.put_u64_le(checksum);
368        buf.put_u32_le(VERSION);
369        buf.put_u32_le(MAGIC);
370    }
371
372    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
373        let mut cursor = buf.len();
374
375        cursor -= 4;
376        let magic = (&buf[cursor..]).get_u32_le();
377        if magic != MAGIC {
378            return Err(HummockError::magic_mismatch(MAGIC, magic));
379        }
380
381        cursor -= 4;
382        let version = (&buf[cursor..cursor + 4]).get_u32_le();
383        if version != VERSION && version != OLD_VERSION {
384            return Err(HummockError::invalid_format_version(version));
385        }
386
387        cursor -= 8;
388        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
389        let buf = &mut &buf[..cursor];
390        xxhash64_verify(buf, checksum)?;
391
392        let block_meta_count = buf.get_u32_le() as usize;
393        let mut block_metas = Vec::with_capacity(block_meta_count);
394        if version == OLD_VERSION {
395            for _ in 0..block_meta_count {
396                block_metas.push(BlockMeta::decode_from_v1(buf));
397            }
398        } else {
399            for _ in 0..block_meta_count {
400                block_metas.push(BlockMeta::decode(buf));
401            }
402        }
403
404        let bloom_filter = get_length_prefixed_slice(buf);
405        let estimated_size = buf.get_u32_le();
406        let key_count = buf.get_u32_le();
407        let smallest_key = get_length_prefixed_slice(buf);
408        let largest_key = get_length_prefixed_slice(buf);
409        let tomb_event_count = buf.get_u32_le() as usize;
410        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
411        for _ in 0..tomb_event_count {
412            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
413            monotonic_tombstone_events.push(monotonic_tombstone_event);
414        }
415        let meta_offset = buf.get_u64_le();
416
417        if !monotonic_tombstone_events.is_empty() {
418            warn!(
419                count = monotonic_tombstone_events.len(),
420                tables = ?monotonic_tombstone_events
421                    .iter()
422                    .map(|event| event.event_key.left_user_key.table_id)
423                    .collect::<HashSet<_>>(),
424                "read non-empty range tombstones");
425        }
426
427        #[expect(deprecated)]
428        Ok(Self {
429            block_metas,
430            bloom_filter,
431            estimated_size,
432            key_count,
433            smallest_key,
434            largest_key,
435            meta_offset,
436            monotonic_tombstone_events,
437            version,
438        })
439    }
440
441    #[inline]
442    pub fn encoded_size(&self) -> usize {
443        4 // block meta count
444            + self
445            .block_metas
446            .iter()
447            .map(|block_meta| block_meta.encoded_size())
448            .sum::<usize>()
449            + 4 // monotonic tombstone events len
450            + 4 // bloom filter len
451            + self.bloom_filter.len()
452            + 4 // estimated size
453            + 4 // key count
454            + 4 // key len
455            + self.smallest_key.len()
456            + 4 // key len
457            + self.largest_key.len()
458            + 8 // footer
459            + 8 // checksum
460            + 4 // version
461            + 4 // magic
462    }
463}
464
465#[derive(Default)]
466pub struct SstableIteratorReadOptions {
467    pub cache_policy: CachePolicy,
468    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
469    pub max_preload_retry_times: usize,
470    pub prefetch_for_large_query: bool,
471}
472
473impl SstableIteratorReadOptions {
474    pub fn from_read_options(read_options: &ReadOptions) -> Self {
475        Self {
476            cache_policy: read_options.cache_policy,
477            must_iterated_end_user_key: None,
478            max_preload_retry_times: 0,
479            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
480        }
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::hummock::HummockValue;
488    use crate::hummock::iterator::test_utils::{
489        default_builder_opt_for_test, iterator_test_key_of,
490    };
491    use crate::hummock::test_utils::gen_test_sstable_data;
492
493    #[test]
494    fn test_sstable_meta_enc_dec() {
495        #[expect(deprecated)]
496        let meta = SstableMeta {
497            block_metas: vec![
498                BlockMeta {
499                    smallest_key: b"0-smallest-key".to_vec(),
500                    len: 100,
501                    ..Default::default()
502                },
503                BlockMeta {
504                    smallest_key: b"5-some-key".to_vec(),
505                    offset: 100,
506                    len: 100,
507                    ..Default::default()
508                },
509            ],
510            bloom_filter: b"0123456789".to_vec(),
511            estimated_size: 123,
512            key_count: 123,
513            smallest_key: b"0-smallest-key".to_vec(),
514            largest_key: b"9-largest-key".to_vec(),
515            meta_offset: 123,
516            monotonic_tombstone_events: vec![],
517            version: VERSION,
518        };
519        let sz = meta.encoded_size();
520        let buf = meta.encode_to_bytes();
521        assert_eq!(sz, buf.len());
522        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
523        assert_eq!(decoded_meta, meta);
524
525        println!("buf: {}", buf.len());
526    }
527
528    #[tokio::test]
529    async fn test_sstable_serde() {
530        let (_, meta) = gen_test_sstable_data(
531            default_builder_opt_for_test(),
532            (0..100).clone().map(|x| {
533                (
534                    iterator_test_key_of(x),
535                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
536                )
537            }),
538        )
539        .await;
540
541        let buffer = bincode::serialize(&meta).unwrap();
542
543        let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
544
545        assert_eq!(meta, m);
546
547        println!("{} vs {}", buffer.len(), meta.encoded_size());
548    }
549}