risingwave_storage/hummock/sstable/
bloom.rs1use std::f64;
18use std::sync::Arc;
19
20use bytes::BufMut;
21
22use super::Sstable;
23use super::filter::FilterBuilder;
24use crate::hummock::MemoryLimiter;
25
26pub trait BitSlice {
27 fn get_bit(&self, idx: usize) -> bool;
28 fn bit_len(&self) -> usize;
29}
30
31pub trait BitSliceMut {
32 fn set_bit(&mut self, idx: usize, val: bool);
33}
34
35impl<T: AsRef<[u8]>> BitSlice for T {
36 fn get_bit(&self, idx: usize) -> bool {
37 let pos = idx / 8;
38 let offset = idx % 8;
39 (self.as_ref()[pos] & (1 << offset)) != 0
40 }
41
42 fn bit_len(&self) -> usize {
43 self.as_ref().len() * 8
44 }
45}
46
47impl<T: AsMut<[u8]>> BitSliceMut for T {
48 fn set_bit(&mut self, idx: usize, val: bool) {
49 let pos = idx / 8;
50 let offset = idx % 8;
51 if val {
52 self.as_mut()[pos] |= 1 << offset;
53 } else {
54 self.as_mut()[pos] &= !(1 << offset);
55 }
56 }
57}
58
59#[allow(dead_code)]
61#[derive(Clone)]
62pub struct BloomFilterReader {
63 data: Vec<u8>,
65 k: u8,
67}
68
69impl BloomFilterReader {
70 #[allow(dead_code)]
72 pub fn new(mut buf: Vec<u8>) -> Self {
73 if buf.len() <= 1 {
74 return Self { data: vec![], k: 0 };
75 }
76 let k = buf[buf.len() - 1];
77 buf.resize(buf.len() - 1, 0);
78 Self { data: buf, k }
79 }
80
81 #[allow(dead_code)]
82 pub fn is_empty(&self) -> bool {
83 self.data.is_empty()
84 }
85
86 #[allow(dead_code)]
87 pub fn get_raw_data(&self) -> &[u8] {
88 &self.data
89 }
90
91 #[allow(dead_code)]
99 pub fn may_match(&self, mut h: u32) -> bool {
100 if self.k > 30 || self.k == 00 {
101 true
103 } else {
104 let nbits = self.data.bit_len();
105 let delta = h.rotate_left(15);
106 for _ in 0..self.k {
107 let bit_pos = h % (nbits as u32);
108 if !self.data.get_bit(bit_pos as usize) {
109 return false;
110 }
111 h = h.wrapping_add(delta);
112 }
113 true
114 }
115 }
116}
117
118pub struct BloomFilterBuilder {
119 key_hash_entries: Vec<u32>,
120 bits_per_key: usize,
121}
122
123impl BloomFilterBuilder {
124 pub fn new(bloom_false_positive: f64, capacity: usize) -> Self {
125 let key_hash_entries = if capacity > 0 {
126 Vec::with_capacity(capacity)
127 } else {
128 vec![]
129 };
130 let bits_per_key = bloom_bits_per_key(capacity, bloom_false_positive);
131 Self {
132 key_hash_entries,
133 bits_per_key,
134 }
135 }
136}
137
138pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize {
140 let size = -1.0 * (entries as f64) * false_positive_rate.ln() / f64::consts::LN_2.powi(2);
141 let locs = (size / (entries as f64)).ceil();
142 locs as usize
143}
144
145impl FilterBuilder for BloomFilterBuilder {
146 fn add_key(&mut self, key: &[u8], table_id: u32) {
147 self.key_hash_entries
148 .push(Sstable::hash_for_bloom_filter_u32(key, table_id));
149 }
150
151 fn approximate_len(&self) -> usize {
152 self.key_hash_entries.len() * 4
153 }
154
155 fn finish(&mut self, memory_limiter: Option<Arc<MemoryLimiter>>) -> Vec<u8> {
156 let _memory_tracker = memory_limiter.as_ref().map(|memory_limit| {
158 memory_limit.must_require_memory(self.approximate_building_memory() as u64)
159 });
160
161 let k = ((self.bits_per_key as f64) * 0.69) as u32;
163 let k = k.clamp(1, 30);
165 let nbits = (self.key_hash_entries.len() * self.bits_per_key).max(64);
167 let nbytes = nbits.div_ceil(8);
168 let nbits = nbytes * 8;
170 let mut filter = Vec::with_capacity(nbytes + 1);
171 filter.resize(nbytes, 0);
172 for h in &self.key_hash_entries {
173 let mut h = *h;
174 let delta = h.rotate_left(15);
175 for _ in 0..k {
176 let bit_pos = (h as usize) % nbits;
177 filter.set_bit(bit_pos, true);
178 h = h.wrapping_add(delta);
179 }
180 }
181 filter.put_u8(k as u8);
182 self.key_hash_entries.clear();
183 filter
184 }
185
186 fn create(fpr: f64, capacity: usize) -> Self {
187 BloomFilterBuilder::new(fpr, capacity)
188 }
189
190 fn approximate_building_memory(&self) -> usize {
191 self.approximate_len()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::ops::BitXor;
198
199 use bytes::Bytes;
200 use xxhash_rust::xxh32;
201
202 use super::*;
203
204 #[test]
205 fn test_small_bloom_filter() {
206 let mut builder = BloomFilterBuilder::new(0.01, 2);
207 builder.add_key(b"hello", 0);
208 builder.add_key(b"world", 0);
209 let buf = builder.finish(None);
210
211 let check_hash: Vec<u32> = vec![
212 b"hello".to_vec(),
213 b"world".to_vec(),
214 b"x".to_vec(),
215 b"fool".to_vec(),
216 ]
217 .into_iter()
218 .map(|x| xxh32::xxh32(&x, 0).bitxor(0))
219 .collect();
220
221 let f = BloomFilterReader::new(buf);
222 assert_eq!(f.k, 6);
223
224 assert!(f.may_match(check_hash[0]));
225 assert!(f.may_match(check_hash[1]));
226 assert!(!f.may_match(check_hash[2]));
227 assert!(!f.may_match(check_hash[3]));
228 }
229
230 fn false_positive_rate_case(
231 preset_key_count: usize,
232 test_key_count: usize,
233 expected_false_positive_rate: f64,
234 ) {
235 let mut builder = BloomFilterBuilder::new(expected_false_positive_rate, preset_key_count);
236 for i in 0..preset_key_count {
237 let k = Bytes::from(format!("{:032}", i));
238 builder.add_key(&k, 0);
239 }
240
241 let data = builder.finish(None);
242 let filter = BloomFilterReader::new(data);
243
244 let mut true_count = 0;
245 for i in preset_key_count..preset_key_count + test_key_count {
246 let k = Bytes::from(format!("{:032}", i));
247 let h = xxh32::xxh32(&k, 0);
248 if !filter.may_match(h) {
249 true_count += 1;
250 }
251 }
252
253 let false_positive_rate = 1_f64 - true_count as f64 / test_key_count as f64;
254 assert!(false_positive_rate < 3_f64 * expected_false_positive_rate);
255 }
256
257 #[test]
258 #[ignore]
259 fn test_false_positive_rate() {
260 const KEY_COUNT: usize = 1300000;
261 const TEST_KEY_COUNT: usize = 100000;
262 false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.1);
263 false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.01);
264 false_positive_rate_case(KEY_COUNT, TEST_KEY_COUNT, 0.001);
265 }
266}