risingwave_storage/hummock/sstable/
bloom.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
16
17use std::f64;
18use std::sync::Arc;
19
20use bytes::BufMut;
21
22use super::Sstable;
23use super::filter::FilterBuilder;
24use crate::hummock::MemoryLimiter;
25
26pub trait BitSlice {
27    fn get_bit(&self, idx: usize) -> bool;
28    fn bit_len(&self) -> usize;
29}
30
31pub trait BitSliceMut {
32    fn set_bit(&mut self, idx: usize, val: bool);
33}
34
35impl<T: AsRef<[u8]>> BitSlice for T {
36    fn get_bit(&self, idx: usize) -> bool {
37        let pos = idx / 8;
38        let offset = idx % 8;
39        (self.as_ref()[pos] & (1 << offset)) != 0
40    }
41
42    fn bit_len(&self) -> usize {
43        self.as_ref().len() * 8
44    }
45}
46
47impl<T: AsMut<[u8]>> BitSliceMut for T {
48    fn set_bit(&mut self, idx: usize, val: bool) {
49        let pos = idx / 8;
50        let offset = idx % 8;
51        if val {
52            self.as_mut()[pos] |= 1 << offset;
53        } else {
54            self.as_mut()[pos] &= !(1 << offset);
55        }
56    }
57}
58
59/// Bloom implements Bloom filter functionalities over a bit-slice of data.
60#[allow(dead_code)]
61#[derive(Clone)]
62pub struct BloomFilterReader {
63    /// data of filter in bits
64    data: Vec<u8>,
65    /// number of hash functions
66    k: u8,
67}
68
69impl BloomFilterReader {
70    /// Creates a Bloom filter from a byte slice
71    #[allow(dead_code)]
72    pub fn new(mut buf: Vec<u8>) -> Self {
73        if buf.len() <= 1 {
74            return Self { data: vec![], k: 0 };
75        }
76        let k = buf[buf.len() - 1];
77        buf.resize(buf.len() - 1, 0);
78        Self { data: buf, k }
79    }
80
81    #[allow(dead_code)]
82    pub fn is_empty(&self) -> bool {
83        self.data.is_empty()
84    }
85
86    #[allow(dead_code)]
87    pub fn get_raw_data(&self) -> &[u8] {
88        &self.data
89    }
90
91    /// Judges whether the hash value is in the table with the given false positive rate.
92    ///
93    /// Note:
94    ///   - if the return value is false, then the table surely does not have the user key that has
95    ///     the hash;
96    ///   - if the return value is true, then the table may or may not have the user key that has
97    ///     the hash actually, a.k.a. we don't know the answer.
98    #[allow(dead_code)]
99    pub fn may_match(&self, mut h: u32) -> bool {
100        if self.k > 30 || self.k == 00 {
101            // potential new encoding for short Bloom filters
102            true
103        } else {
104            let nbits = self.data.bit_len();
105            let delta = h.rotate_left(15);
106            for _ in 0..self.k {
107                let bit_pos = h % (nbits as u32);
108                if !self.data.get_bit(bit_pos as usize) {
109                    return false;
110                }
111                h = h.wrapping_add(delta);
112            }
113            true
114        }
115    }
116}
117
118pub struct BloomFilterBuilder {
119    key_hash_entries: Vec<u32>,
120    bits_per_key: usize,
121}
122
123impl BloomFilterBuilder {
124    pub fn new(bloom_false_positive: f64, capacity: usize) -> Self {
125        let key_hash_entries = if capacity > 0 {
126            Vec::with_capacity(capacity)
127        } else {
128            vec![]
129        };
130        let bits_per_key = bloom_bits_per_key(capacity, bloom_false_positive);
131        Self {
132            key_hash_entries,
133            bits_per_key,
134        }
135    }
136}
137
138/// Gets Bloom filter bits per key from entries count and FPR
139pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize {
140    let size = -1.0 * (entries as f64) * false_positive_rate.ln() / f64::consts::LN_2.powi(2);
141    let locs = (size / (entries as f64)).ceil();
142    locs as usize
143}
144
145impl FilterBuilder for BloomFilterBuilder {
146    fn add_key(&mut self, key: &[u8], table_id: u32) {
147        self.key_hash_entries
148            .push(Sstable::hash_for_bloom_filter_u32(key, table_id));
149    }
150
151    fn approximate_len(&self) -> usize {
152        self.key_hash_entries.len() * 4
153    }
154
155    fn finish(&mut self, memory_limiter: Option<Arc<MemoryLimiter>>) -> Vec<u8> {
156        // FIXME: If Bloom is enabled, a more accurate memory calculation is used for Bloom
157        let _memory_tracker = memory_limiter.as_ref().map(|memory_limit| {
158            memory_limit.must_require_memory(self.approximate_building_memory() as u64)
159        });
160
161        // 0.69 is approximately ln(2)
162        let k = ((self.bits_per_key as f64) * 0.69) as u32;
163        // limit k in [1, 30]
164        let k = k.clamp(1, 30);
165        // For small len(keys), we set a minimum Bloom filter length to avoid high FPR
166        let nbits = (self.key_hash_entries.len() * self.bits_per_key).max(64);
167        let nbytes = nbits.div_ceil(8);
168        // nbits is always multiplication of 8
169        let nbits = nbytes * 8;
170        let mut filter = Vec::with_capacity(nbytes + 1);
171        filter.resize(nbytes, 0);
172        for h in &self.key_hash_entries {
173            let mut h = *h;
174            let delta = h.rotate_left(15);
175            for _ in 0..k {
176                let bit_pos = (h as usize) % nbits;
177                filter.set_bit(bit_pos, true);
178                h = h.wrapping_add(delta);
179            }
180        }
181        filter.put_u8(k as u8);
182        self.key_hash_entries.clear();
183        filter
184    }
185
186    fn create(fpr: f64, capacity: usize) -> Self {
187        BloomFilterBuilder::new(fpr, capacity)
188    }
189
190    fn approximate_building_memory(&self) -> usize {
191        self.approximate_len()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::ops::BitXor;
198
199    use bytes::Bytes;
200    use xxhash_rust::xxh32;
201
202    use super::*;
203
204    #[test]
205    fn test_small_bloom_filter() {
206        let mut builder = BloomFilterBuilder::new(0.01, 2);
207        builder.add_key(b"hello", 0);
208        builder.add_key(b"world", 0);
209        let buf = builder.finish(None);
210
211        let check_hash: Vec<u32> = vec![
212            b"hello".to_vec(),
213            b"world".to_vec(),
214            b"x".to_vec(),
215            b"fool".to_vec(),
216        ]
217        .into_iter()
218        .map(|x| xxh32::xxh32(&x, 0).bitxor(0))
219        .collect();
220
221        let f = BloomFilterReader::new(buf);
222        assert_eq!(f.k, 6);
223
224        assert!(f.may_match(check_hash[0]));
225        assert!(f.may_match(check_hash[1]));
226        assert!(!f.may_match(check_hash[2]));
227        assert!(!f.may_match(check_hash[3]));
228    }
229
230    fn false_positive_rate_case(
231        preset_key_count: usize,
232        test_key_count: usize,
233        expected_false_positive_rate: f64,
234    ) {
235        let mut builder = BloomFilterBuilder::new(expected_false_positive_rate, preset_key_count);
236        for i in 0..preset_key_count {
237            let k = Bytes::from(format!("{:032}", i));
238            builder.add_key(&k, 0);
239        }
240
241        let data = builder.finish(None);
242        let filter = BloomFilterReader::new(data);
243
244        let mut true_count = 0;
245        for i in preset_key_count..preset_key_count + test_key_count {
246            let k = Bytes::from(format!("{:032}", i));
247            let h = xxh32::xxh32(&k, 0);
248            if !filter.may_match(h) {
249                true_count += 1;
250            }
251        }
252
253        let false_positive_rate = 1_f64 - true_count as f64 / test_key_count as f64;
254        assert!(false_positive_rate < 3_f64 * expected_false_positive_rate);
255    }
256
257    #[test]
258    #[ignore]
259    fn test_false_positive_rate() {
260        const KEY_COUNT: usize = 1300000;
261        const TEST_KEY_COUNT: usize = 100000;
262        false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.1);
263        false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.01);
264        false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.001);
265    }
266}