risingwave_storage/hummock/sstable/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hummock state store's SST builder, format and iterator
16
17// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
18mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod xor_filter;
28use serde::{Deserialize, Serialize};
29pub use xor_filter::{
30    BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder,
31    XorFilterReader,
32};
33pub mod builder;
34pub use builder::*;
35pub mod writer;
36use risingwave_common::catalog::TableId;
37pub use writer::*;
38mod forward_sstable_iterator;
39pub mod multi_builder;
40use bytes::{Buf, BufMut};
41pub use forward_sstable_iterator::*;
42use tracing::warn;
43mod backward_sstable_iterator;
44pub use backward_sstable_iterator::*;
45use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
46use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
47
48mod filter;
49mod utils;
50
51pub use filter::FilterBuilder;
52pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
53use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
54use xxhash_rust::xxh64;
55
56use super::{HummockError, HummockResult};
57use crate::hummock::CachePolicy;
58use crate::store::ReadOptions;
59
60const MAGIC: u32 = 0x5785ab73;
61const OLD_VERSION: u32 = 1;
62const VERSION: u32 = 2;
63
64/// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
65/// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
66/// can be transformed into events below:
67/// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
68/// -epoch3> }`
69/// Then we can get monotonic events (they are in order by user key) as below:
70/// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
71/// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if epoch2,
72/// etc. In this example, at the event key wmk1 (5), delete range changes from epoch1 to epoch2,
73/// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the
74/// next event key wmk2 (7) (not inclusive).
75/// If there is no range deletes between current event key and next event key, `new_epoch` will be
76/// `HummockEpoch::MAX`.
77#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
78pub struct MonotonicDeleteEvent {
79    pub event_key:
80        risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
81    pub new_epoch: HummockEpoch,
82}
83
84impl MonotonicDeleteEvent {
85    pub fn encode(&self, mut buf: impl BufMut) {
86        self.event_key
87            .left_user_key
88            .encode_length_prefixed(&mut buf);
89        buf.put_u8(if self.event_key.is_exclude_left_key {
90            1
91        } else {
92            0
93        });
94        buf.put_u64_le(self.new_epoch);
95    }
96
97    pub fn decode(buf: &mut &[u8]) -> Self {
98        use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
99        let user_key = UserKey::decode_length_prefixed(buf);
100        let exclude_left_key_flag = buf.get_u8();
101        let is_exclude_left_key = match exclude_left_key_flag {
102            0 => false,
103            1 => true,
104            _ => panic!("exclusive flag should be either 0 or 1"),
105        };
106        let new_epoch = buf.get_u64_le();
107        Self {
108            event_key: PointRange {
109                left_user_key: user_key,
110                is_exclude_left_key,
111            },
112            new_epoch,
113        }
114    }
115}
116
117#[derive(Serialize, Deserialize)]
118struct SerdeSstable {
119    id: HummockSstableObjectId,
120    meta: SstableMeta,
121}
122
123impl From<SerdeSstable> for Sstable {
124    fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
125        // Set skip_bloom_filter_in_serde to false because the behavior
126        // is determined by the serializer
127        Sstable::new(id, meta, false)
128    }
129}
130
131/// [`Sstable`] is a handle for accessing SST.
132#[derive(Clone, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135    pub id: HummockSstableObjectId,
136    pub meta: SstableMeta,
137    #[serde(skip)]
138    pub filter_reader: XorFilterReader,
139    /// SST serde happens when an SST meta is written to meta disk cache.
140    /// Excluding the SST filter from serde can reduce the meta disk cache entry size
141    /// and reduce disk IO throughput at the cost of making the SST filter useless.
142    #[serde(skip)]
143    skip_bloom_filter_in_serde: bool,
144}
145
146impl Serialize for Sstable {
147    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        let mut serde_sstable = SerdeSstable {
152            id: self.id,
153            meta: self.meta.clone(),
154        };
155        if !self.skip_bloom_filter_in_serde {
156            serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
157        }
158        serde_sstable.serialize(serializer)
159    }
160}
161
162impl Debug for Sstable {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("Sstable")
165            .field("id", &self.id)
166            .field("meta", &self.meta)
167            .finish()
168    }
169}
170
171impl Sstable {
172    pub fn new(
173        id: HummockSstableObjectId,
174        mut meta: SstableMeta,
175        skip_bloom_filter_in_serde: bool,
176    ) -> Self {
177        let filter_data = std::mem::take(&mut meta.bloom_filter);
178        let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
179        Self {
180            id,
181            meta,
182            filter_reader,
183            skip_bloom_filter_in_serde,
184        }
185    }
186
187    #[inline(always)]
188    pub fn has_filter(&self) -> bool {
189        !self.filter_reader.is_empty()
190    }
191
192    pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
193        let block_meta = &self.meta.block_metas[block_index];
194        let range =
195            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
196        let uncompressed_capacity = block_meta.uncompressed_size as usize;
197        (range, uncompressed_capacity)
198    }
199
200    #[inline(always)]
201    pub fn hash_for_filter(dist_key: &[u8], table_id: u32) -> u64 {
202        let dist_key_hash = xxh64::xxh64(dist_key, 0);
203        // congyi adds this because he aims to dedup keys in different tables
204        (table_id as u64).bitxor(dist_key_hash)
205    }
206
207    #[inline(always)]
208    pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
209        self.filter_reader
210            .may_match(&self.meta.block_metas, user_key_range, hash)
211    }
212
213    #[inline(always)]
214    pub fn block_count(&self) -> usize {
215        self.meta.block_metas.len()
216    }
217
218    #[inline(always)]
219    pub fn estimate_size(&self) -> usize {
220        8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size()
221    }
222}
223
224#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
225pub struct BlockMeta {
226    pub smallest_key: Vec<u8>,
227    pub offset: u32,
228    pub len: u32,
229    pub uncompressed_size: u32,
230    pub total_key_count: u32,
231    pub stale_key_count: u32,
232}
233
234impl BlockMeta {
235    /// Format:
236    ///
237    /// ```plain
238    /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key |
239    /// ```
240    pub fn encode(&self, mut buf: impl BufMut) {
241        buf.put_u32_le(self.offset);
242        buf.put_u32_le(self.len);
243        buf.put_u32_le(self.uncompressed_size);
244        buf.put_u32_le(self.total_key_count);
245        buf.put_u32_le(self.stale_key_count);
246        put_length_prefixed_slice(buf, &self.smallest_key);
247    }
248
249    pub fn decode(buf: &mut &[u8]) -> Self {
250        let offset = buf.get_u32_le();
251        let len = buf.get_u32_le();
252        let uncompressed_size = buf.get_u32_le();
253
254        let total_key_count = buf.get_u32_le();
255        let stale_key_count = buf.get_u32_le();
256        let smallest_key = get_length_prefixed_slice(buf);
257        Self {
258            smallest_key,
259            offset,
260            len,
261            uncompressed_size,
262            total_key_count,
263            stale_key_count,
264        }
265    }
266
267    pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
268        let offset = buf.get_u32_le();
269        let len = buf.get_u32_le();
270        let uncompressed_size = buf.get_u32_le();
271        let total_key_count = 0;
272        let stale_key_count = 0;
273        let smallest_key = get_length_prefixed_slice(buf);
274        Self {
275            smallest_key,
276            offset,
277            len,
278            uncompressed_size,
279            total_key_count,
280            stale_key_count,
281        }
282    }
283
284    #[inline]
285    pub fn encoded_size(&self) -> usize {
286        24 /* offset + len + key len + uncompressed size + total key count + stale key count */ + self.smallest_key.len()
287    }
288
289    pub fn table_id(&self) -> TableId {
290        FullKey::decode(&self.smallest_key).user_key.table_id
291    }
292}
293
294#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
295pub struct SstableMeta {
296    pub block_metas: Vec<BlockMeta>,
297    pub bloom_filter: Vec<u8>,
298    pub estimated_size: u32,
299    pub key_count: u32,
300    pub smallest_key: Vec<u8>,
301    pub largest_key: Vec<u8>,
302    pub meta_offset: u64,
303    /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges
304    /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }`
305    /// can be transformed into events below:
306    /// `{ <0, +epoch1> <wmk1, -epoch1> <wmk1, +epoch2> <wmk2, -epoch2> <wmk2, +epoch3> <wmk3,
307    /// -epoch3> }`
308    /// Then we can get monotonic events (they are in order by user key) as below:
309    /// `{ <0, epoch1>, <wmk1, epoch2>, <wmk2, epoch3>, <wmk3, +inf> }`
310    /// which means that delete range of [0, wmk1) is epoch1, delete range of [wmk1, wmk2) if
311    /// epoch2, etc. In this example, at the event key wmk1 (5), delete range changes from
312    /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event
313    /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive).
314    /// If there is no range deletes between current event key and next event key, `new_epoch` will
315    /// be `HummockEpoch::MAX`.
316    #[deprecated]
317    pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
318    /// Format version, for further compatibility.
319    pub version: u32,
320}
321
322impl SstableMeta {
323    /// Format:
324    ///
325    /// ```plain
326    /// | N (4B) |
327    /// | block meta 0 | ... | block meta N-1 |
328    /// | SST filter len (4B) | SST filter |
329    /// | estimated size (4B) | key count (4B) |
330    /// | smallest key len (4B) | smallest key |
331    /// | largest key len (4B) | largest key |
332    /// | K (4B) |
333    /// | tombstone-event 0 | ... | tombstone-event K-1 |
334    /// | file offset of this meta block (8B) |
335    /// | checksum (8B) | version (4B) | magic (4B) |
336    /// ```
337    pub fn encode_to_bytes(&self) -> Vec<u8> {
338        let encoded_size = self.encoded_size();
339        let mut buf = Vec::with_capacity(encoded_size);
340        self.encode_to(&mut buf);
341        buf
342    }
343
344    pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
345        let start = buf.as_ref().len();
346
347        buf.put_u32_le(
348            utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
349                let tmp_full_key = FullKey::decode(&self.smallest_key);
350                panic!(
351                    "WARN overflow can't convert block_metas_len {} into u32 table {}",
352                    self.block_metas.len(),
353                    tmp_full_key.user_key.table_id,
354                )
355            }),
356        );
357        for block_meta in &self.block_metas {
358            block_meta.encode(&mut buf);
359        }
360        put_length_prefixed_slice(&mut buf, &self.bloom_filter);
361        buf.put_u32_le(self.estimated_size);
362        buf.put_u32_le(self.key_count);
363        put_length_prefixed_slice(&mut buf, &self.smallest_key);
364        put_length_prefixed_slice(&mut buf, &self.largest_key);
365        #[expect(deprecated)]
366        buf.put_u32_le(
367            utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
368                let tmp_full_key = FullKey::decode(&self.smallest_key);
369                panic!(
370                    "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
371                    self.monotonic_tombstone_events.len(),
372                    tmp_full_key.user_key.table_id,
373                )
374            }),
375        );
376        #[expect(deprecated)]
377        for monotonic_tombstone_event in &self.monotonic_tombstone_events {
378            monotonic_tombstone_event.encode(&mut buf);
379        }
380        buf.put_u64_le(self.meta_offset);
381
382        let end = buf.as_ref().len();
383
384        let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
385        buf.put_u64_le(checksum);
386        buf.put_u32_le(VERSION);
387        buf.put_u32_le(MAGIC);
388    }
389
390    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
391        let mut cursor = buf.len();
392
393        cursor -= 4;
394        let magic = (&buf[cursor..]).get_u32_le();
395        if magic != MAGIC {
396            return Err(HummockError::magic_mismatch(MAGIC, magic));
397        }
398
399        cursor -= 4;
400        let version = (&buf[cursor..cursor + 4]).get_u32_le();
401        if version != VERSION && version != OLD_VERSION {
402            return Err(HummockError::invalid_format_version(version));
403        }
404
405        cursor -= 8;
406        let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
407        let buf = &mut &buf[..cursor];
408        xxhash64_verify(buf, checksum)?;
409
410        let block_meta_count = buf.get_u32_le() as usize;
411        let mut block_metas = Vec::with_capacity(block_meta_count);
412        if version == OLD_VERSION {
413            for _ in 0..block_meta_count {
414                block_metas.push(BlockMeta::decode_from_v1(buf));
415            }
416        } else {
417            for _ in 0..block_meta_count {
418                block_metas.push(BlockMeta::decode(buf));
419            }
420        }
421
422        let bloom_filter = get_length_prefixed_slice(buf);
423        let estimated_size = buf.get_u32_le();
424        let key_count = buf.get_u32_le();
425        let smallest_key = get_length_prefixed_slice(buf);
426        let largest_key = get_length_prefixed_slice(buf);
427        let tomb_event_count = buf.get_u32_le() as usize;
428        let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
429        for _ in 0..tomb_event_count {
430            let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
431            monotonic_tombstone_events.push(monotonic_tombstone_event);
432        }
433        let meta_offset = buf.get_u64_le();
434
435        if !monotonic_tombstone_events.is_empty() {
436            warn!(
437                count = monotonic_tombstone_events.len(),
438                tables = ?monotonic_tombstone_events
439                    .iter()
440                    .map(|event| event.event_key.left_user_key.table_id)
441                    .collect::<HashSet<_>>(),
442                "read non-empty range tombstones");
443        }
444
445        #[expect(deprecated)]
446        Ok(Self {
447            block_metas,
448            bloom_filter,
449            estimated_size,
450            key_count,
451            smallest_key,
452            largest_key,
453            meta_offset,
454            monotonic_tombstone_events,
455            version,
456        })
457    }
458
459    #[inline]
460    pub fn encoded_size(&self) -> usize {
461        4 // block meta count
462            + self
463            .block_metas
464            .iter()
465            .map(|block_meta| block_meta.encoded_size())
466            .sum::<usize>()
467            + 4 // monotonic tombstone events len
468            + 4 // SST filter len
469            + self.bloom_filter.len()
470            + 4 // estimated size
471            + 4 // key count
472            + 4 // key len
473            + self.smallest_key.len()
474            + 4 // key len
475            + self.largest_key.len()
476            + 8 // footer
477            + 8 // checksum
478            + 4 // version
479            + 4 // magic
480    }
481}
482
483#[derive(Default)]
484pub struct SstableIteratorReadOptions {
485    pub cache_policy: CachePolicy,
486    pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
487    pub max_preload_retry_times: usize,
488    pub prefetch_for_large_query: bool,
489}
490
491impl SstableIteratorReadOptions {
492    pub fn from_read_options(read_options: &ReadOptions) -> Self {
493        Self {
494            cache_policy: read_options.cache_policy,
495            must_iterated_end_user_key: None,
496            max_preload_retry_times: 0,
497            prefetch_for_large_query: read_options.prefetch_options.for_large_query,
498        }
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::hummock::HummockValue;
506    use crate::hummock::iterator::test_utils::{
507        default_builder_opt_for_test, iterator_test_key_of,
508    };
509    use crate::hummock::test_utils::gen_test_sstable_data;
510
511    #[test]
512    fn test_sstable_meta_enc_dec() {
513        #[expect(deprecated)]
514        let meta = SstableMeta {
515            block_metas: vec![
516                BlockMeta {
517                    smallest_key: b"0-smallest-key".to_vec(),
518                    len: 100,
519                    ..Default::default()
520                },
521                BlockMeta {
522                    smallest_key: b"5-some-key".to_vec(),
523                    offset: 100,
524                    len: 100,
525                    ..Default::default()
526                },
527            ],
528            bloom_filter: b"0123456789".to_vec(),
529            estimated_size: 123,
530            key_count: 123,
531            smallest_key: b"0-smallest-key".to_vec(),
532            largest_key: b"9-largest-key".to_vec(),
533            meta_offset: 123,
534            monotonic_tombstone_events: vec![],
535            version: VERSION,
536        };
537        let sz = meta.encoded_size();
538        let buf = meta.encode_to_bytes();
539        assert_eq!(sz, buf.len());
540        let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
541        assert_eq!(decoded_meta, meta);
542
543        println!("buf: {}", buf.len());
544    }
545
546    #[tokio::test]
547    async fn test_sstable_serde() {
548        let (_, meta) = gen_test_sstable_data(
549            default_builder_opt_for_test(),
550            (0..100).clone().map(|x| {
551                (
552                    iterator_test_key_of(x),
553                    HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
554                )
555            }),
556        )
557        .await;
558
559        // skip sst serde
560        let sstable = Sstable::new(42.into(), meta.clone(), true);
561
562        let buffer = bincode::serialize(&sstable).unwrap();
563
564        let s: Sstable = bincode::deserialize(&buffer).unwrap();
565
566        assert_eq!(s.id, sstable.id);
567        assert_eq!(s.meta, sstable.meta);
568        assert!(!sstable.filter_reader.is_empty());
569        // The table filter reader is empty because the SST filter is skipped in serde.
570        assert!(s.filter_reader.is_empty());
571
572        // enable sst serde
573        let sstable = Sstable::new(42.into(), meta, false);
574
575        let buffer = bincode::serialize(&sstable).unwrap();
576
577        let s: Sstable = bincode::deserialize(&buffer).unwrap();
578
579        assert_eq!(s.id, sstable.id);
580        assert_eq!(s.meta, sstable.meta);
581        assert_eq!(
582            s.filter_reader.encode_to_bytes(),
583            sstable.filter_reader.encode_to_bytes()
584        );
585    }
586}