risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32    BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::CompressionAlgorithm;
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use self::utils::{xxhash64_checksum, xxhash64_verify};
58use super::{HummockError, HummockResult};
59use crate::hummock::CachePolicy;
60use crate::store::ReadOptions;
61
62const MAGIC: u32 = 0x5785ab73;
63const OLD_VERSION: u32 = 1;
64const VERSION: u32 = 2;
65
66/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
67/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
68/// can be transformed into events below:
69/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
70/// -epoch3> }`
71/// Then we can get monotonic events (they are in order by user key) as below:
72/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
73/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
74/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
75/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
76/// next event key wmk2 (7) (not inclusive).
77/// If there is no range deletes between current event key and next event key, `new_epoch` will be
78/// `HummockEpoch::MAX`.
79#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
80pub struct MonotonicDeleteEvent {
81    pub event_key:
82        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
83    pub new_epoch: HummockEpoch,
84}
85
86impl MonotonicDeleteEvent {
87    pub fn encode(&self, mut buf: impl BufMut) {
88        self.event_key
89            .left_user_key
90            .encode_length_prefixed(&mut buf);
91        buf.put_u8(if self.event_key.is_exclude_left_key {
92            1
93        } else {
94            0
95        });
96        buf.put_u64_le(self.new_epoch);
97    }
98
99    pub fn decode(buf: &mut &[u8]) -> Self {
100        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
101        let user_key = UserKey::decode_length_prefixed(buf);
102        let exclude_left_key_flag = buf.get_u8();
103        let is_exclude_left_key = match exclude_left_key_flag {
104            0 => false,
105            1 => true,
106            _ => panic!("exclusive flag should be either 0 or 1"),
107        };
108        let new_epoch = buf.get_u64_le();
109        Self {
110            event_key: PointRange {
111                left_user_key: user_key,
112                is_exclude_left_key,
113            },
114            new_epoch,
115        }
116    }
117}
118
119#[derive(Serialize, Deserialize)]
120struct SerdeSstable {
121    id: HummockSstableObjectId,
122    meta: SstableMeta,
123}
124
125impl From<SerdeSstable> for Sstable {
126    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
127        Sstable::new(id, meta)
128    }
129}
130
131/// [`Sstable`] is a handle for accessing SST.
132#[derive(Clone, Serialize, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135    pub id: HummockSstableObjectId,
136    pub meta: SstableMeta,
137    #[serde(skip)]
138    pub filter_reader: XorFilterReader,
139}
140
141impl Debug for Sstable {
142    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("Sstable")
144            .field("id", &self.id)
145            .field("meta", &self.meta)
146            .finish()
147    }
148}
149
150impl Sstable {
151    pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
152        let filter_data = std::mem::take(&mut meta.bloom_filter);
153        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
154        Self {
155            id,
156            meta,
157            filter_reader,
158        }
159    }
160
161    #[inline(always)]
162    pub fn has_bloom_filter(&self) -> bool {
163        !self.filter_reader.is_empty()
164    }
165
166    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
167        let block_meta = &self.meta.block_metas[block_index];
168        let range =
169            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
170        let uncompressed_capacity = block_meta.uncompressed_size as usize;
171        (range, uncompressed_capacity)
172    }
173
174    #[inline(always)]
175    pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
176        let dist_key_hash = xxh32::xxh32(dist_key, 0);
177        // congyi adds this because he aims to dedup keys in different tables
178        table_id.bitxor(dist_key_hash)
179    }
180
181    #[inline(always)]
182    pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
183        let dist_key_hash = xxh64::xxh64(dist_key, 0);
184        // congyi adds this because he aims to dedup keys in different tables
185        (table_id as u64).bitxor(dist_key_hash)
186    }
187
188    #[inline(always)]
189    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
190        self.filter_reader.may_match(user_key_range, hash)
191    }
192
193    #[inline(always)]
194    pub fn block_count(&self) -> usize {
195        self.meta.block_metas.len()
196    }
197
198    #[inline(always)]
199    pub fn estimate_size(&self) -> usize {
200        8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size()
201    }
202}
203
204#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
205pub struct BlockMeta {
206    pub smallest_key: Vec<u8>,
207    pub offset: u32,
208    pub len: u32,
209    pub uncompressed_size: u32,
210    pub total_key_count: u32,
211    pub stale_key_count: u32,
212}
213
214impl BlockMeta {
215    /// Format:
216    ///
217    /// ```plain
218    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
219    /// ```
220    pub fn encode(&self, mut buf: impl BufMut) {
221        buf.put_u32_le(self.offset);
222        buf.put_u32_le(self.len);
223        buf.put_u32_le(self.uncompressed_size);
224        buf.put_u32_le(self.total_key_count);
225        buf.put_u32_le(self.stale_key_count);
226        put_length_prefixed_slice(buf, &self.smallest_key);
227    }
228
229    pub fn decode(buf: &mut &[u8]) -> Self {
230        let offset = buf.get_u32_le();
231        let len = buf.get_u32_le();
232        let uncompressed_size = buf.get_u32_le();
233
234        let total_key_count = buf.get_u32_le();
235        let stale_key_count = buf.get_u32_le();
236        let smallest_key = get_length_prefixed_slice(buf);
237        Self {
238            smallest_key,
239            offset,
240            len,
241            uncompressed_size,
242            total_key_count,
243            stale_key_count,
244        }
245    }
246
247    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
248        let offset = buf.get_u32_le();
249        let len = buf.get_u32_le();
250        let uncompressed_size = buf.get_u32_le();
251        let total_key_count = 0;
252        let stale_key_count = 0;
253        let smallest_key = get_length_prefixed_slice(buf);
254        Self {
255            smallest_key,
256            offset,
257            len,
258            uncompressed_size,
259            total_key_count,
260            stale_key_count,
261        }
262    }
263
264    #[inline]
265    pub fn encoded_size(&self) -> usize {
266        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
267    }
268
269    pub fn table_id(&self) -> TableId {
270        FullKey::decode(&self.smallest_key).user_key.table_id
271    }
272}
273
274#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
275pub struct SstableMeta {
276    pub block_metas: Vec<BlockMeta>,
277    pub bloom_filter: Vec<u8>,
278    pub estimated_size: u32,
279    pub key_count: u32,
280    pub smallest_key: Vec<u8>,
281    pub largest_key: Vec<u8>,
282    pub meta_offset: u64,
283    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
284    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
285    /// can be transformed into events below:
286    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
287    /// -epoch3> }`
288    /// Then we can get monotonic events (they are in order by user key) as below:
289    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
290    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
291    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
292    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
293    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
294    /// If there is no range deletes between current event key and next event key, `new_epoch` will
295    /// be `HummockEpoch::MAX`.
296    #[deprecated]
297    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
298    /// Format version, for further compatibility.
299    pub version: u32,
300}
301
302impl SstableMeta {
303    /// Format:
304    ///
305    /// ```plain
306    /// | N (4B) |
307    /// | block meta 0 | ... | block meta N-1 |
308    /// | bloom filter len (4B) | bloom filter |
309    /// | estimated size (4B) | key count (4B) |
310    /// | smallest key len (4B) | smallest key |
311    /// | largest key len (4B) | largest key |
312    /// | K (4B) |
313    /// | tombstone-event 0 | ... | tombstone-event K-1 |
314    /// | file offset of this meta block (8B) |
315    /// | checksum (8B) | version (4B) | magic (4B) |
316    /// ```
317    pub fn encode_to_bytes(&self) -> Vec<u8> {
318        let encoded_size = self.encoded_size();
319        let mut buf = Vec::with_capacity(encoded_size);
320        self.encode_to(&mut buf);
321        buf
322    }
323
324    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
325        let start = buf.as_ref().len();
326
327        buf.put_u32_le(
328            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
329                let tmp_full_key = FullKey::decode(&self.smallest_key);
330                panic!(
331                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
332                    self.block_metas.len(),
333                    tmp_full_key.user_key.table_id,
334                )
335            }),
336        );
337        for block_meta in &self.block_metas {
338            block_meta.encode(&mut buf);
339        }
340        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
341        buf.put_u32_le(self.estimated_size);
342        buf.put_u32_le(self.key_count);
343        put_length_prefixed_slice(&mut buf, &self.smallest_key);
344        put_length_prefixed_slice(&mut buf, &self.largest_key);
345        #[expect(deprecated)]
346        buf.put_u32_le(
347            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
348                let tmp_full_key = FullKey::decode(&self.smallest_key);
349                panic!(
350                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
351                    self.monotonic_tombstone_events.len(),
352                    tmp_full_key.user_key.table_id,
353                )
354            }),
355        );
356        #[expect(deprecated)]
357        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
358            monotonic_tombstone_event.encode(&mut buf);
359        }
360        buf.put_u64_le(self.meta_offset);
361
362        let end = buf.as_ref().len();
363
364        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
365        buf.put_u64_le(checksum);
366        buf.put_u32_le(VERSION);
367        buf.put_u32_le(MAGIC);
368    }
369
370    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
371        let mut cursor = buf.len();
372
373        cursor -= 4;
374        let magic = (&buf[cursor..]).get_u32_le();
375        if magic != MAGIC {
376            return Err(HummockError::magic_mismatch(MAGIC, magic));
377        }
378
379        cursor -= 4;
380        let version = (&buf[cursor..cursor + 4]).get_u32_le();
381        if version != VERSION && version != OLD_VERSION {
382            return Err(HummockError::invalid_format_version(version));
383        }
384
385        cursor -= 8;
386        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
387        let buf = &mut &buf[..cursor];
388        xxhash64_verify(buf, checksum)?;
389
390        let block_meta_count = buf.get_u32_le() as usize;
391        let mut block_metas = Vec::with_capacity(block_meta_count);
392        if version == OLD_VERSION {
393            for _ in 0..block_meta_count {
394                block_metas.push(BlockMeta::decode_from_v1(buf));
395            }
396        } else {
397            for _ in 0..block_meta_count {
398                block_metas.push(BlockMeta::decode(buf));
399            }
400        }
401
402        let bloom_filter = get_length_prefixed_slice(buf);
403        let estimated_size = buf.get_u32_le();
404        let key_count = buf.get_u32_le();
405        let smallest_key = get_length_prefixed_slice(buf);
406        let largest_key = get_length_prefixed_slice(buf);
407        let tomb_event_count = buf.get_u32_le() as usize;
408        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
409        for _ in 0..tomb_event_count {
410            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
411            monotonic_tombstone_events.push(monotonic_tombstone_event);
412        }
413        let meta_offset = buf.get_u64_le();
414
415        if !monotonic_tombstone_events.is_empty() {
416            warn!(
417                count = monotonic_tombstone_events.len(),
418                tables = ?monotonic_tombstone_events
419                    .iter()
420                    .map(|event| event.event_key.left_user_key.table_id)
421                    .collect::<HashSet<_>>(),
422                "read non-empty range tombstones");
423        }
424
425        #[expect(deprecated)]
426        Ok(Self {
427            block_metas,
428            bloom_filter,
429            estimated_size,
430            key_count,
431            smallest_key,
432            largest_key,
433            meta_offset,
434            monotonic_tombstone_events,
435            version,
436        })
437    }
438
439    #[inline]
440    pub fn encoded_size(&self) -> usize {
441        4 // block meta count
442            + self
443            .block_metas
444            .iter()
445            .map(|block_meta| block_meta.encoded_size())
446            .sum::<usize>()
447            + 4 // monotonic tombstone events len
448            + 4 // bloom filter len
449            + self.bloom_filter.len()
450            + 4 // estimated size
451            + 4 // key count
452            + 4 // key len
453            + self.smallest_key.len()
454            + 4 // key len
455            + self.largest_key.len()
456            + 8 // footer
457            + 8 // checksum
458            + 4 // version
459            + 4 // magic
460    }
461}
462
463#[derive(Default)]
464pub struct SstableIteratorReadOptions {
465    pub cache_policy: CachePolicy,
466    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
467    pub max_preload_retry_times: usize,
468    pub prefetch_for_large_query: bool,
469}
470
471impl SstableIteratorReadOptions {
472    pub fn from_read_options(read_options: &ReadOptions) -> Self {
473        Self {
474            cache_policy: read_options.cache_policy,
475            must_iterated_end_user_key: None,
476            max_preload_retry_times: 0,
477            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
478        }
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::hummock::HummockValue;
486    use crate::hummock::iterator::test_utils::{
487        default_builder_opt_for_test, iterator_test_key_of,
488    };
489    use crate::hummock::test_utils::gen_test_sstable_data;
490
491    #[test]
492    fn test_sstable_meta_enc_dec() {
493        #[expect(deprecated)]
494        let meta = SstableMeta {
495            block_metas: vec![
496                BlockMeta {
497                    smallest_key: b"0-smallest-key".to_vec(),
498                    len: 100,
499                    ..Default::default()
500                },
501                BlockMeta {
502                    smallest_key: b"5-some-key".to_vec(),
503                    offset: 100,
504                    len: 100,
505                    ..Default::default()
506                },
507            ],
508            bloom_filter: b"0123456789".to_vec(),
509            estimated_size: 123,
510            key_count: 123,
511            smallest_key: b"0-smallest-key".to_vec(),
512            largest_key: b"9-largest-key".to_vec(),
513            meta_offset: 123,
514            monotonic_tombstone_events: vec![],
515            version: VERSION,
516        };
517        let sz = meta.encoded_size();
518        let buf = meta.encode_to_bytes();
519        assert_eq!(sz, buf.len());
520        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
521        assert_eq!(decoded_meta, meta);
522
523        println!("buf: {}", buf.len());
524    }
525
526    #[tokio::test]
527    async fn test_sstable_serde() {
528        let (_, meta) = gen_test_sstable_data(
529            default_builder_opt_for_test(),
530            (0..100).clone().map(|x| {
531                (
532                    iterator_test_key_of(x),
533                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
534                )
535            }),
536        )
537        .await;
538
539        let buffer = bincode::serialize(&meta).unwrap();
540
541        let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
542
543        assert_eq!(meta, m);
544
545        println!("{} vs {}", buffer.len(), meta.encoded_size());
546    }
547}