risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32    BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use super::{HummockError, HummockResult};
58use crate::hummock::CachePolicy;
59use crate::store::ReadOptions;
60
61const MAGIC: u32 = 0x5785ab73;
62const OLD_VERSION: u32 = 1;
63const VERSION: u32 = 2;
64
65/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
66/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
67/// can be transformed into events below:
68/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
69/// -epoch3> }`
70/// Then we can get monotonic events (they are in order by user key) as below:
71/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
72/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
73/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
74/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
75/// next event key wmk2 (7) (not inclusive).
76/// If there is no range deletes between current event key and next event key, `new_epoch` will be
77/// `HummockEpoch::MAX`.
78#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
79pub struct MonotonicDeleteEvent {
80    pub event_key:
81        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
82    pub new_epoch: HummockEpoch,
83}
84
85impl MonotonicDeleteEvent {
86    pub fn encode(&self, mut buf: impl BufMut) {
87        self.event_key
88            .left_user_key
89            .encode_length_prefixed(&mut buf);
90        buf.put_u8(if self.event_key.is_exclude_left_key {
91            1
92        } else {
93            0
94        });
95        buf.put_u64_le(self.new_epoch);
96    }
97
98    pub fn decode(buf: &mut &[u8]) -> Self {
99        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
100        let user_key = UserKey::decode_length_prefixed(buf);
101        let exclude_left_key_flag = buf.get_u8();
102        let is_exclude_left_key = match exclude_left_key_flag {
103            0 => false,
104            1 => true,
105            _ => panic!("exclusive flag should be either 0 or 1"),
106        };
107        let new_epoch = buf.get_u64_le();
108        Self {
109            event_key: PointRange {
110                left_user_key: user_key,
111                is_exclude_left_key,
112            },
113            new_epoch,
114        }
115    }
116}
117
118#[derive(Serialize, Deserialize)]
119struct SerdeSstable {
120    id: HummockSstableObjectId,
121    meta: SstableMeta,
122}
123
124impl From<SerdeSstable> for Sstable {
125    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
126        Sstable::new(id, meta)
127    }
128}
129
130/// [`Sstable`] is a handle for accessing SST.
131#[derive(Clone, Serialize, Deserialize)]
132#[serde(from = "SerdeSstable")]
133pub struct Sstable {
134    pub id: HummockSstableObjectId,
135    pub meta: SstableMeta,
136    #[serde(skip)]
137    pub filter_reader: XorFilterReader,
138}
139
140impl Debug for Sstable {
141    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("Sstable")
143            .field("id", &self.id)
144            .field("meta", &self.meta)
145            .finish()
146    }
147}
148
149impl Sstable {
150    pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
151        let filter_data = std::mem::take(&mut meta.bloom_filter);
152        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
153        Self {
154            id,
155            meta,
156            filter_reader,
157        }
158    }
159
160    #[inline(always)]
161    pub fn has_bloom_filter(&self) -> bool {
162        !self.filter_reader.is_empty()
163    }
164
165    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
166        let block_meta = &self.meta.block_metas[block_index];
167        let range =
168            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
169        let uncompressed_capacity = block_meta.uncompressed_size as usize;
170        (range, uncompressed_capacity)
171    }
172
173    #[inline(always)]
174    pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
175        let dist_key_hash = xxh32::xxh32(dist_key, 0);
176        // congyi adds this because he aims to dedup keys in different tables
177        table_id.bitxor(dist_key_hash)
178    }
179
180    #[inline(always)]
181    pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
182        let dist_key_hash = xxh64::xxh64(dist_key, 0);
183        // congyi adds this because he aims to dedup keys in different tables
184        (table_id as u64).bitxor(dist_key_hash)
185    }
186
187    #[inline(always)]
188    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
189        self.filter_reader.may_match(user_key_range, hash)
190    }
191
192    #[inline(always)]
193    pub fn block_count(&self) -> usize {
194        self.meta.block_metas.len()
195    }
196
197    #[inline(always)]
198    pub fn estimate_size(&self) -> usize {
199        8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size()
200    }
201}
202
203#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
204pub struct BlockMeta {
205    pub smallest_key: Vec<u8>,
206    pub offset: u32,
207    pub len: u32,
208    pub uncompressed_size: u32,
209    pub total_key_count: u32,
210    pub stale_key_count: u32,
211}
212
213impl BlockMeta {
214    /// Format:
215    ///
216    /// ```plain
217    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
218    /// ```
219    pub fn encode(&self, mut buf: impl BufMut) {
220        buf.put_u32_le(self.offset);
221        buf.put_u32_le(self.len);
222        buf.put_u32_le(self.uncompressed_size);
223        buf.put_u32_le(self.total_key_count);
224        buf.put_u32_le(self.stale_key_count);
225        put_length_prefixed_slice(buf, &self.smallest_key);
226    }
227
228    pub fn decode(buf: &mut &[u8]) -> Self {
229        let offset = buf.get_u32_le();
230        let len = buf.get_u32_le();
231        let uncompressed_size = buf.get_u32_le();
232
233        let total_key_count = buf.get_u32_le();
234        let stale_key_count = buf.get_u32_le();
235        let smallest_key = get_length_prefixed_slice(buf);
236        Self {
237            smallest_key,
238            offset,
239            len,
240            uncompressed_size,
241            total_key_count,
242            stale_key_count,
243        }
244    }
245
246    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
247        let offset = buf.get_u32_le();
248        let len = buf.get_u32_le();
249        let uncompressed_size = buf.get_u32_le();
250        let total_key_count = 0;
251        let stale_key_count = 0;
252        let smallest_key = get_length_prefixed_slice(buf);
253        Self {
254            smallest_key,
255            offset,
256            len,
257            uncompressed_size,
258            total_key_count,
259            stale_key_count,
260        }
261    }
262
263    #[inline]
264    pub fn encoded_size(&self) -> usize {
265        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
266    }
267
268    pub fn table_id(&self) -> TableId {
269        FullKey::decode(&self.smallest_key).user_key.table_id
270    }
271}
272
273#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
274pub struct SstableMeta {
275    pub block_metas: Vec<BlockMeta>,
276    pub bloom_filter: Vec<u8>,
277    pub estimated_size: u32,
278    pub key_count: u32,
279    pub smallest_key: Vec<u8>,
280    pub largest_key: Vec<u8>,
281    pub meta_offset: u64,
282    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
283    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
284    /// can be transformed into events below:
285    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
286    /// -epoch3> }`
287    /// Then we can get monotonic events (they are in order by user key) as below:
288    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
289    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
290    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
291    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
292    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
293    /// If there is no range deletes between current event key and next event key, `new_epoch` will
294    /// be `HummockEpoch::MAX`.
295    #[deprecated]
296    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
297    /// Format version, for further compatibility.
298    pub version: u32,
299}
300
301impl SstableMeta {
302    /// Format:
303    ///
304    /// ```plain
305    /// | N (4B) |
306    /// | block meta 0 | ... | block meta N-1 |
307    /// | bloom filter len (4B) | bloom filter |
308    /// | estimated size (4B) | key count (4B) |
309    /// | smallest key len (4B) | smallest key |
310    /// | largest key len (4B) | largest key |
311    /// | K (4B) |
312    /// | tombstone-event 0 | ... | tombstone-event K-1 |
313    /// | file offset of this meta block (8B) |
314    /// | checksum (8B) | version (4B) | magic (4B) |
315    /// ```
316    pub fn encode_to_bytes(&self) -> Vec<u8> {
317        let encoded_size = self.encoded_size();
318        let mut buf = Vec::with_capacity(encoded_size);
319        self.encode_to(&mut buf);
320        buf
321    }
322
323    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
324        let start = buf.as_ref().len();
325
326        buf.put_u32_le(
327            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
328                let tmp_full_key = FullKey::decode(&self.smallest_key);
329                panic!(
330                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
331                    self.block_metas.len(),
332                    tmp_full_key.user_key.table_id,
333                )
334            }),
335        );
336        for block_meta in &self.block_metas {
337            block_meta.encode(&mut buf);
338        }
339        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
340        buf.put_u32_le(self.estimated_size);
341        buf.put_u32_le(self.key_count);
342        put_length_prefixed_slice(&mut buf, &self.smallest_key);
343        put_length_prefixed_slice(&mut buf, &self.largest_key);
344        #[expect(deprecated)]
345        buf.put_u32_le(
346            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
347                let tmp_full_key = FullKey::decode(&self.smallest_key);
348                panic!(
349                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
350                    self.monotonic_tombstone_events.len(),
351                    tmp_full_key.user_key.table_id,
352                )
353            }),
354        );
355        #[expect(deprecated)]
356        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
357            monotonic_tombstone_event.encode(&mut buf);
358        }
359        buf.put_u64_le(self.meta_offset);
360
361        let end = buf.as_ref().len();
362
363        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
364        buf.put_u64_le(checksum);
365        buf.put_u32_le(VERSION);
366        buf.put_u32_le(MAGIC);
367    }
368
369    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
370        let mut cursor = buf.len();
371
372        cursor -= 4;
373        let magic = (&buf[cursor..]).get_u32_le();
374        if magic != MAGIC {
375            return Err(HummockError::magic_mismatch(MAGIC, magic));
376        }
377
378        cursor -= 4;
379        let version = (&buf[cursor..cursor + 4]).get_u32_le();
380        if version != VERSION && version != OLD_VERSION {
381            return Err(HummockError::invalid_format_version(version));
382        }
383
384        cursor -= 8;
385        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
386        let buf = &mut &buf[..cursor];
387        xxhash64_verify(buf, checksum)?;
388
389        let block_meta_count = buf.get_u32_le() as usize;
390        let mut block_metas = Vec::with_capacity(block_meta_count);
391        if version == OLD_VERSION {
392            for _ in 0..block_meta_count {
393                block_metas.push(BlockMeta::decode_from_v1(buf));
394            }
395        } else {
396            for _ in 0..block_meta_count {
397                block_metas.push(BlockMeta::decode(buf));
398            }
399        }
400
401        let bloom_filter = get_length_prefixed_slice(buf);
402        let estimated_size = buf.get_u32_le();
403        let key_count = buf.get_u32_le();
404        let smallest_key = get_length_prefixed_slice(buf);
405        let largest_key = get_length_prefixed_slice(buf);
406        let tomb_event_count = buf.get_u32_le() as usize;
407        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
408        for _ in 0..tomb_event_count {
409            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
410            monotonic_tombstone_events.push(monotonic_tombstone_event);
411        }
412        let meta_offset = buf.get_u64_le();
413
414        if !monotonic_tombstone_events.is_empty() {
415            warn!(
416                count = monotonic_tombstone_events.len(),
417                tables = ?monotonic_tombstone_events
418                    .iter()
419                    .map(|event| event.event_key.left_user_key.table_id)
420                    .collect::<HashSet<_>>(),
421                "read non-empty range tombstones");
422        }
423
424        #[expect(deprecated)]
425        Ok(Self {
426            block_metas,
427            bloom_filter,
428            estimated_size,
429            key_count,
430            smallest_key,
431            largest_key,
432            meta_offset,
433            monotonic_tombstone_events,
434            version,
435        })
436    }
437
438    #[inline]
439    pub fn encoded_size(&self) -> usize {
440        4 // block meta count
441            + self
442            .block_metas
443            .iter()
444            .map(|block_meta| block_meta.encoded_size())
445            .sum::<usize>()
446            + 4 // monotonic tombstone events len
447            + 4 // bloom filter len
448            + self.bloom_filter.len()
449            + 4 // estimated size
450            + 4 // key count
451            + 4 // key len
452            + self.smallest_key.len()
453            + 4 // key len
454            + self.largest_key.len()
455            + 8 // footer
456            + 8 // checksum
457            + 4 // version
458            + 4 // magic
459    }
460}
461
462#[derive(Default)]
463pub struct SstableIteratorReadOptions {
464    pub cache_policy: CachePolicy,
465    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
466    pub max_preload_retry_times: usize,
467    pub prefetch_for_large_query: bool,
468}
469
470impl SstableIteratorReadOptions {
471    pub fn from_read_options(read_options: &ReadOptions) -> Self {
472        Self {
473            cache_policy: read_options.cache_policy,
474            must_iterated_end_user_key: None,
475            max_preload_retry_times: 0,
476            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use crate::hummock::HummockValue;
485    use crate::hummock::iterator::test_utils::{
486        default_builder_opt_for_test, iterator_test_key_of,
487    };
488    use crate::hummock::test_utils::gen_test_sstable_data;
489
490    #[test]
491    fn test_sstable_meta_enc_dec() {
492        #[expect(deprecated)]
493        let meta = SstableMeta {
494            block_metas: vec![
495                BlockMeta {
496                    smallest_key: b"0-smallest-key".to_vec(),
497                    len: 100,
498                    ..Default::default()
499                },
500                BlockMeta {
501                    smallest_key: b"5-some-key".to_vec(),
502                    offset: 100,
503                    len: 100,
504                    ..Default::default()
505                },
506            ],
507            bloom_filter: b"0123456789".to_vec(),
508            estimated_size: 123,
509            key_count: 123,
510            smallest_key: b"0-smallest-key".to_vec(),
511            largest_key: b"9-largest-key".to_vec(),
512            meta_offset: 123,
513            monotonic_tombstone_events: vec![],
514            version: VERSION,
515        };
516        let sz = meta.encoded_size();
517        let buf = meta.encode_to_bytes();
518        assert_eq!(sz, buf.len());
519        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
520        assert_eq!(decoded_meta, meta);
521
522        println!("buf: {}", buf.len());
523    }
524
525    #[tokio::test]
526    async fn test_sstable_serde() {
527        let (_, meta) = gen_test_sstable_data(
528            default_builder_opt_for_test(),
529            (0..100).clone().map(|x| {
530                (
531                    iterator_test_key_of(x),
532                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
533                )
534            }),
535        )
536        .await;
537
538        let buffer = bincode::serialize(&meta).unwrap();
539
540        let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
541
542        assert_eq!(meta, m);
543
544        println!("{} vs {}", buffer.len(), meta.encoded_size());
545    }
546}