risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod xor_filter;
28use serde::{Deserialize, Serialize};
29pub use xor_filter::{
30    BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder,
31    XorFilterReader,
32};
33pub mod builder;
34pub use builder::*;
35pub mod writer;
36use risingwave_common::catalog::TableId;
37pub use writer::*;
38mod forward_sstable_iterator;
39pub mod multi_builder;
40use bytes::{Buf, BufMut};
41pub use forward_sstable_iterator::*;
42use tracing::warn;
43mod backward_sstable_iterator;
44pub use backward_sstable_iterator::*;
45use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
46use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
47
48mod filter;
49mod utils;
50
51pub use filter::{DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions};
52pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
53use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
54use xxhash_rust::xxh64;
55
56use super::{HummockError, HummockResult};
57use crate::hummock::CachePolicy;
58use crate::store::ReadOptions;
59
60const MAGIC: u32 = 0x5785ab73;
61const OLD_VERSION: u32 = 1;
62const VERSION: u32 = 2;
63
64/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
65/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
66/// can be transformed into events below:
67/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
68/// -epoch3> }`
69/// Then we can get monotonic events (they are in order by user key) as below:
70/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
71/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
72/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
73/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
74/// next event key wmk2 (7) (not inclusive).
75/// If there is no range deletes between current event key and next event key, `new_epoch` will be
76/// `HummockEpoch::MAX`.
77#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
78pub struct MonotonicDeleteEvent {
79    pub event_key:
80        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
81    pub new_epoch: HummockEpoch,
82}
83
84impl MonotonicDeleteEvent {
85    pub fn encode(&self, mut buf: impl BufMut) {
86        self.event_key
87            .left_user_key
88            .encode_length_prefixed(&mut buf);
89        buf.put_u8(if self.event_key.is_exclude_left_key {
90            1
91        } else {
92            0
93        });
94        buf.put_u64_le(self.new_epoch);
95    }
96
97    pub fn decode(buf: &mut &[u8]) -> Self {
98        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
99        let user_key = UserKey::decode_length_prefixed(buf);
100        let exclude_left_key_flag = buf.get_u8();
101        let is_exclude_left_key = match exclude_left_key_flag {
102            0 => false,
103            1 => true,
104            _ => panic!("exclusive flag should be either 0 or 1"),
105        };
106        let new_epoch = buf.get_u64_le();
107        Self {
108            event_key: PointRange {
109                left_user_key: user_key,
110                is_exclude_left_key,
111            },
112            new_epoch,
113        }
114    }
115}
116
117#[derive(Serialize, Deserialize)]
118struct SerdeSstable {
119    id: HummockSstableObjectId,
120    meta: SstableMeta,
121}
122
123impl From<SerdeSstable> for Sstable {
124    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
125        // Set skip_bloom_filter_in_serde to false because the behavior
126        // is determined by the serializer
127        Sstable::new(id, meta, false)
128    }
129}
130
131/// [`Sstable`] is a handle for accessing SST.
132#[derive(Clone, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135    pub id: HummockSstableObjectId,
136    pub meta: SstableMeta,
137    #[serde(skip)]
138    pub filter_reader: XorFilterReader,
139    /// SST serde happens when an SST meta is written to meta disk cache.
140    /// Excluding the SST filter from serde can reduce the meta disk cache entry size
141    /// and reduce disk IO throughput at the cost of making the SST filter useless.
142    #[serde(skip)]
143    skip_bloom_filter_in_serde: bool,
144}
145
146impl Serialize for Sstable {
147    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        let mut serde_sstable = SerdeSstable {
152            id: self.id,
153            meta: self.meta.clone(),
154        };
155        if !self.skip_bloom_filter_in_serde {
156            serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
157        }
158        serde_sstable.serialize(serializer)
159    }
160}
161
162impl Debug for Sstable {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("Sstable")
165            .field("id", &self.id)
166            .field("meta", &self.meta)
167            .finish()
168    }
169}
170
171impl Sstable {
172    pub fn new(
173        id: HummockSstableObjectId,
174        mut meta: SstableMeta,
175        skip_bloom_filter_in_serde: bool,
176    ) -> Self {
177        let filter_data = std::mem::take(&mut meta.bloom_filter);
178        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
179        Self {
180            id,
181            meta,
182            filter_reader,
183            skip_bloom_filter_in_serde,
184        }
185    }
186
187    #[inline(always)]
188    pub fn has_filter(&self) -> bool {
189        !self.filter_reader.is_empty()
190    }
191
192    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
193        let block_meta = &self.meta.block_metas[block_index];
194        let range =
195            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
196        let uncompressed_capacity = block_meta.uncompressed_size as usize;
197        (range, uncompressed_capacity)
198    }
199
200    #[inline(always)]
201    pub fn hash_for_filter(dist_key: &[u8], table_id: u32) -> u64 {
202        let dist_key_hash = xxh64::xxh64(dist_key, 0);
203        // congyi adds this because he aims to dedup keys in different tables
204        (table_id as u64).bitxor(dist_key_hash)
205    }
206
207    #[inline(always)]
208    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
209        self.filter_reader
210            .may_match(&self.meta.block_metas, user_key_range, hash)
211    }
212
213    #[inline(always)]
214    pub fn block_count(&self) -> usize {
215        self.meta.block_metas.len()
216    }
217
218    #[inline(always)]
219    pub fn estimated_meta_cache_memory_weight(&self) -> usize {
220        // This is for foyer's in-memory cache weighter. The disk tier uses foyer `Code`
221        // serialization and `estimated_size` instead.
222        std::mem::size_of::<Self>()
223            + self.meta.estimated_heap_size()
224            + self.filter_reader.estimated_heap_size()
225    }
226}
227
228#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
229pub struct BlockMeta {
230    pub smallest_key: Vec<u8>,
231    pub offset: u32,
232    pub len: u32,
233    pub uncompressed_size: u32,
234    pub total_key_count: u32,
235    pub stale_key_count: u32,
236}
237
238impl BlockMeta {
239    /// Format:
240    ///
241    /// ```plain
242    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
243    /// ```
244    pub fn encode(&self, mut buf: impl BufMut) {
245        buf.put_u32_le(self.offset);
246        buf.put_u32_le(self.len);
247        buf.put_u32_le(self.uncompressed_size);
248        buf.put_u32_le(self.total_key_count);
249        buf.put_u32_le(self.stale_key_count);
250        put_length_prefixed_slice(buf, &self.smallest_key);
251    }
252
253    pub fn decode(buf: &mut &[u8]) -> Self {
254        let offset = buf.get_u32_le();
255        let len = buf.get_u32_le();
256        let uncompressed_size = buf.get_u32_le();
257
258        let total_key_count = buf.get_u32_le();
259        let stale_key_count = buf.get_u32_le();
260        let smallest_key = get_length_prefixed_slice(buf);
261        Self {
262            smallest_key,
263            offset,
264            len,
265            uncompressed_size,
266            total_key_count,
267            stale_key_count,
268        }
269    }
270
271    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
272        let offset = buf.get_u32_le();
273        let len = buf.get_u32_le();
274        let uncompressed_size = buf.get_u32_le();
275        let total_key_count = 0;
276        let stale_key_count = 0;
277        let smallest_key = get_length_prefixed_slice(buf);
278        Self {
279            smallest_key,
280            offset,
281            len,
282            uncompressed_size,
283            total_key_count,
284            stale_key_count,
285        }
286    }
287
288    #[inline]
289    pub fn encoded_size(&self) -> usize {
290        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
291    }
292
293    pub fn table_id(&self) -> TableId {
294        FullKey::decode(&self.smallest_key).user_key.table_id
295    }
296
297    fn estimated_heap_size(&self) -> usize {
298        self.smallest_key.capacity()
299    }
300}
301
302#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
303pub struct SstableMeta {
304    pub block_metas: Vec<BlockMeta>,
305    pub bloom_filter: Vec<u8>,
306    pub estimated_size: u32,
307    pub key_count: u32,
308    pub smallest_key: Vec<u8>,
309    pub largest_key: Vec<u8>,
310    pub meta_offset: u64,
311    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
312    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
313    /// can be transformed into events below:
314    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
315    /// -epoch3> }`
316    /// Then we can get monotonic events (they are in order by user key) as below:
317    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
318    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
319    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
320    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
321    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
322    /// If there is no range deletes between current event key and next event key, `new_epoch` will
323    /// be `HummockEpoch::MAX`.
324    #[deprecated]
325    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
326    /// Format version, for further compatibility.
327    pub version: u32,
328}
329
330impl SstableMeta {
331    /// Format:
332    ///
333    /// ```plain
334    /// | N (4B) |
335    /// | block meta 0 | ... | block meta N-1 |
336    /// | SST filter len (4B) | SST filter |
337    /// | estimated size (4B) | key count (4B) |
338    /// | smallest key len (4B) | smallest key |
339    /// | largest key len (4B) | largest key |
340    /// | K (4B) |
341    /// | tombstone-event 0 | ... | tombstone-event K-1 |
342    /// | file offset of this meta block (8B) |
343    /// | checksum (8B) | version (4B) | magic (4B) |
344    /// ```
345    pub fn encode_to_bytes(&self) -> Vec<u8> {
346        let encoded_size = self.encoded_size();
347        let mut buf = Vec::with_capacity(encoded_size);
348        self.encode_to(&mut buf);
349        buf
350    }
351
352    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
353        let start = buf.as_ref().len();
354
355        buf.put_u32_le(
356            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
357                let tmp_full_key = FullKey::decode(&self.smallest_key);
358                panic!(
359                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
360                    self.block_metas.len(),
361                    tmp_full_key.user_key.table_id,
362                )
363            }),
364        );
365        for block_meta in &self.block_metas {
366            block_meta.encode(&mut buf);
367        }
368        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
369        buf.put_u32_le(self.estimated_size);
370        buf.put_u32_le(self.key_count);
371        put_length_prefixed_slice(&mut buf, &self.smallest_key);
372        put_length_prefixed_slice(&mut buf, &self.largest_key);
373        #[expect(deprecated)]
374        buf.put_u32_le(
375            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
376                let tmp_full_key = FullKey::decode(&self.smallest_key);
377                panic!(
378                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
379                    self.monotonic_tombstone_events.len(),
380                    tmp_full_key.user_key.table_id,
381                )
382            }),
383        );
384        #[expect(deprecated)]
385        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
386            monotonic_tombstone_event.encode(&mut buf);
387        }
388        buf.put_u64_le(self.meta_offset);
389
390        let end = buf.as_ref().len();
391
392        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
393        buf.put_u64_le(checksum);
394        buf.put_u32_le(VERSION);
395        buf.put_u32_le(MAGIC);
396    }
397
398    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
399        let mut cursor = buf.len();
400
401        cursor -= 4;
402        let magic = (&buf[cursor..]).get_u32_le();
403        if magic != MAGIC {
404            return Err(HummockError::magic_mismatch(MAGIC, magic));
405        }
406
407        cursor -= 4;
408        let version = (&buf[cursor..cursor + 4]).get_u32_le();
409        if version != VERSION && version != OLD_VERSION {
410            return Err(HummockError::invalid_format_version(version));
411        }
412
413        cursor -= 8;
414        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
415        let buf = &mut &buf[..cursor];
416        xxhash64_verify(buf, checksum)?;
417
418        let block_meta_count = buf.get_u32_le() as usize;
419        let mut block_metas = Vec::with_capacity(block_meta_count);
420        if version == OLD_VERSION {
421            for _ in 0..block_meta_count {
422                block_metas.push(BlockMeta::decode_from_v1(buf));
423            }
424        } else {
425            for _ in 0..block_meta_count {
426                block_metas.push(BlockMeta::decode(buf));
427            }
428        }
429
430        let bloom_filter = get_length_prefixed_slice(buf);
431        let estimated_size = buf.get_u32_le();
432        let key_count = buf.get_u32_le();
433        let smallest_key = get_length_prefixed_slice(buf);
434        let largest_key = get_length_prefixed_slice(buf);
435        let tomb_event_count = buf.get_u32_le() as usize;
436        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
437        for _ in 0..tomb_event_count {
438            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
439            monotonic_tombstone_events.push(monotonic_tombstone_event);
440        }
441        let meta_offset = buf.get_u64_le();
442
443        if !monotonic_tombstone_events.is_empty() {
444            warn!(
445                count = monotonic_tombstone_events.len(),
446                tables = ?monotonic_tombstone_events
447                    .iter()
448                    .map(|event| event.event_key.left_user_key.table_id)
449                    .collect::<HashSet<_>>(),
450                "read non-empty range tombstones");
451        }
452
453        #[expect(deprecated)]
454        Ok(Self {
455            block_metas,
456            bloom_filter,
457            estimated_size,
458            key_count,
459            smallest_key,
460            largest_key,
461            meta_offset,
462            monotonic_tombstone_events,
463            version,
464        })
465    }
466
467    #[inline]
468    pub fn encoded_size(&self) -> usize {
469        4 // block meta count
470            + self
471            .block_metas
472            .iter()
473            .map(|block_meta| block_meta.encoded_size())
474            .sum::<usize>()
475            + 4 // monotonic tombstone events len
476            + 4 // SST filter len
477            + self.bloom_filter.len()
478            + 4 // estimated size
479            + 4 // key count
480            + 4 // key len
481            + self.smallest_key.len()
482            + 4 // key len
483            + self.largest_key.len()
484            + 8 // footer
485            + 8 // checksum
486            + 4 // version
487            + 4 // magic
488    }
489
490    #[expect(
491        deprecated,
492        reason = "monotonic_tombstone_events is deprecated but still contributes to decoded meta heap size"
493    )]
494    fn estimated_heap_size(&self) -> usize {
495        self.block_metas.capacity() * std::mem::size_of::<BlockMeta>()
496            + self
497                .block_metas
498                .iter()
499                .map(BlockMeta::estimated_heap_size)
500                .sum::<usize>()
501            + self.bloom_filter.capacity()
502            + self.smallest_key.capacity()
503            + self.largest_key.capacity()
504            + self.monotonic_tombstone_events.capacity()
505                * std::mem::size_of::<MonotonicDeleteEvent>()
506    }
507}
508
509#[derive(Default)]
510pub struct SstableIteratorReadOptions {
511    pub cache_policy: CachePolicy,
512    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
513    pub max_preload_retry_times: usize,
514    pub prefetch_for_large_query: bool,
515}
516
517impl SstableIteratorReadOptions {
518    pub fn from_read_options(read_options: &ReadOptions) -> Self {
519        Self {
520            cache_policy: read_options.cache_policy,
521            must_iterated_end_user_key: None,
522            max_preload_retry_times: 0,
523            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::hummock::HummockValue;
532    use crate::hummock::iterator::test_utils::{
533        default_builder_opt_for_test, iterator_test_key_of,
534    };
535    use crate::hummock::test_utils::gen_test_sstable_data;
536
537    #[test]
538    fn test_sstable_meta_enc_dec() {
539        #[expect(deprecated)]
540        let meta = SstableMeta {
541            block_metas: vec![
542                BlockMeta {
543                    smallest_key: b"0-smallest-key".to_vec(),
544                    len: 100,
545                    ..Default::default()
546                },
547                BlockMeta {
548                    smallest_key: b"5-some-key".to_vec(),
549                    offset: 100,
550                    len: 100,
551                    ..Default::default()
552                },
553            ],
554            bloom_filter: b"0123456789".to_vec(),
555            estimated_size: 123,
556            key_count: 123,
557            smallest_key: b"0-smallest-key".to_vec(),
558            largest_key: b"9-largest-key".to_vec(),
559            meta_offset: 123,
560            monotonic_tombstone_events: vec![],
561            version: VERSION,
562        };
563        let sz = meta.encoded_size();
564        let buf = meta.encode_to_bytes();
565        assert_eq!(sz, buf.len());
566        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
567        assert_eq!(decoded_meta, meta);
568
569        println!("buf: {}", buf.len());
570    }
571
572    #[tokio::test]
573    async fn test_sstable_serde() {
574        let (_, meta) = gen_test_sstable_data(
575            default_builder_opt_for_test(),
576            (0..100).clone().map(|x| {
577                (
578                    iterator_test_key_of(x),
579                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
580                )
581            }),
582        )
583        .await;
584
585        // skip sst serde
586        let sstable = Sstable::new(42.into(), meta.clone(), true);
587
588        let buffer = bincode::serialize(&sstable).unwrap();
589
590        let s: Sstable = bincode::deserialize(&buffer).unwrap();
591
592        assert_eq!(s.id, sstable.id);
593        assert_eq!(s.meta, sstable.meta);
594        assert!(!sstable.filter_reader.is_empty());
595        // The table filter reader is empty because the SST filter is skipped in serde.
596        assert!(s.filter_reader.is_empty());
597
598        // enable sst serde
599        let sstable = Sstable::new(42.into(), meta, false);
600
601        let buffer = bincode::serialize(&sstable).unwrap();
602
603        let s: Sstable = bincode::deserialize(&buffer).unwrap();
604
605        assert_eq!(s.id, sstable.id);
606        assert_eq!(s.meta, sstable.meta);
607        assert_eq!(
608            s.filter_reader.encode_to_bytes(),
609            sstable.filter_reader.encode_to_bytes()
610        );
611    }
612}