1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32 BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use super::{HummockError, HummockResult};
58use crate::hummock::CachePolicy;
59use crate::store::ReadOptions;
60
61const MAGIC: u32 = 0x5785ab73;
62const OLD_VERSION: u32 = 1;
63const VERSION: u32 = 2;
64
65#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
79pub struct MonotonicDeleteEvent {
80 pub event_key:
81 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
82 pub new_epoch: HummockEpoch,
83}
84
85impl MonotonicDeleteEvent {
86 pub fn encode(&self, mut buf: impl BufMut) {
87 self.event_key
88 .left_user_key
89 .encode_length_prefixed(&mut buf);
90 buf.put_u8(if self.event_key.is_exclude_left_key {
91 1
92 } else {
93 0
94 });
95 buf.put_u64_le(self.new_epoch);
96 }
97
98 pub fn decode(buf: &mut &[u8]) -> Self {
99 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
100 let user_key = UserKey::decode_length_prefixed(buf);
101 let exclude_left_key_flag = buf.get_u8();
102 let is_exclude_left_key = match exclude_left_key_flag {
103 0 => false,
104 1 => true,
105 _ => panic!("exclusive flag should be either 0 or 1"),
106 };
107 let new_epoch = buf.get_u64_le();
108 Self {
109 event_key: PointRange {
110 left_user_key: user_key,
111 is_exclude_left_key,
112 },
113 new_epoch,
114 }
115 }
116}
117
118#[derive(Serialize, Deserialize)]
119struct SerdeSstable {
120 id: HummockSstableObjectId,
121 meta: SstableMeta,
122}
123
124impl From<SerdeSstable> for Sstable {
125 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
126 Sstable::new(id, meta)
127 }
128}
129
130#[derive(Clone, Serialize, Deserialize)]
132#[serde(from = "SerdeSstable")]
133pub struct Sstable {
134 pub id: HummockSstableObjectId,
135 pub meta: SstableMeta,
136 #[serde(skip)]
137 pub filter_reader: XorFilterReader,
138}
139
140impl Debug for Sstable {
141 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("Sstable")
143 .field("id", &self.id)
144 .field("meta", &self.meta)
145 .finish()
146 }
147}
148
149impl Sstable {
150 pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
151 let filter_data = std::mem::take(&mut meta.bloom_filter);
152 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
153 Self {
154 id,
155 meta,
156 filter_reader,
157 }
158 }
159
160 #[inline(always)]
161 pub fn has_bloom_filter(&self) -> bool {
162 !self.filter_reader.is_empty()
163 }
164
165 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
166 let block_meta = &self.meta.block_metas[block_index];
167 let range =
168 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
169 let uncompressed_capacity = block_meta.uncompressed_size as usize;
170 (range, uncompressed_capacity)
171 }
172
173 #[inline(always)]
174 pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
175 let dist_key_hash = xxh32::xxh32(dist_key, 0);
176 table_id.bitxor(dist_key_hash)
178 }
179
180 #[inline(always)]
181 pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
182 let dist_key_hash = xxh64::xxh64(dist_key, 0);
183 (table_id as u64).bitxor(dist_key_hash)
185 }
186
187 #[inline(always)]
188 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
189 self.filter_reader.may_match(user_key_range, hash)
190 }
191
192 #[inline(always)]
193 pub fn block_count(&self) -> usize {
194 self.meta.block_metas.len()
195 }
196
197 #[inline(always)]
198 pub fn estimate_size(&self) -> usize {
199 8 + self.filter_reader.estimate_size() + self.meta.encoded_size()
200 }
201}
202
203#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
204pub struct BlockMeta {
205 pub smallest_key: Vec<u8>,
206 pub offset: u32,
207 pub len: u32,
208 pub uncompressed_size: u32,
209 pub total_key_count: u32,
210 pub stale_key_count: u32,
211}
212
213impl BlockMeta {
214 pub fn encode(&self, mut buf: impl BufMut) {
220 buf.put_u32_le(self.offset);
221 buf.put_u32_le(self.len);
222 buf.put_u32_le(self.uncompressed_size);
223 buf.put_u32_le(self.total_key_count);
224 buf.put_u32_le(self.stale_key_count);
225 put_length_prefixed_slice(buf, &self.smallest_key);
226 }
227
228 pub fn decode(buf: &mut &[u8]) -> Self {
229 let offset = buf.get_u32_le();
230 let len = buf.get_u32_le();
231 let uncompressed_size = buf.get_u32_le();
232
233 let total_key_count = buf.get_u32_le();
234 let stale_key_count = buf.get_u32_le();
235 let smallest_key = get_length_prefixed_slice(buf);
236 Self {
237 smallest_key,
238 offset,
239 len,
240 uncompressed_size,
241 total_key_count,
242 stale_key_count,
243 }
244 }
245
246 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
247 let offset = buf.get_u32_le();
248 let len = buf.get_u32_le();
249 let uncompressed_size = buf.get_u32_le();
250 let total_key_count = 0;
251 let stale_key_count = 0;
252 let smallest_key = get_length_prefixed_slice(buf);
253 Self {
254 smallest_key,
255 offset,
256 len,
257 uncompressed_size,
258 total_key_count,
259 stale_key_count,
260 }
261 }
262
263 #[inline]
264 pub fn encoded_size(&self) -> usize {
265 24 + self.smallest_key.len()
266 }
267
268 pub fn table_id(&self) -> TableId {
269 FullKey::decode(&self.smallest_key).user_key.table_id
270 }
271}
272
273#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
274pub struct SstableMeta {
275 pub block_metas: Vec<BlockMeta>,
276 pub bloom_filter: Vec<u8>,
277 pub estimated_size: u32,
278 pub key_count: u32,
279 pub smallest_key: Vec<u8>,
280 pub largest_key: Vec<u8>,
281 pub meta_offset: u64,
282 #[deprecated]
296 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
297 pub version: u32,
299}
300
301impl SstableMeta {
302 pub fn encode_to_bytes(&self) -> Vec<u8> {
317 let encoded_size = self.encoded_size();
318 let mut buf = Vec::with_capacity(encoded_size);
319 self.encode_to(&mut buf);
320 buf
321 }
322
323 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
324 let start = buf.as_ref().len();
325
326 buf.put_u32_le(
327 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
328 let tmp_full_key = FullKey::decode(&self.smallest_key);
329 panic!(
330 "WARN overflow can't convert block_metas_len {} into u32 table {}",
331 self.block_metas.len(),
332 tmp_full_key.user_key.table_id,
333 )
334 }),
335 );
336 for block_meta in &self.block_metas {
337 block_meta.encode(&mut buf);
338 }
339 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
340 buf.put_u32_le(self.estimated_size);
341 buf.put_u32_le(self.key_count);
342 put_length_prefixed_slice(&mut buf, &self.smallest_key);
343 put_length_prefixed_slice(&mut buf, &self.largest_key);
344 #[expect(deprecated)]
345 buf.put_u32_le(
346 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
347 let tmp_full_key = FullKey::decode(&self.smallest_key);
348 panic!(
349 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
350 self.monotonic_tombstone_events.len(),
351 tmp_full_key.user_key.table_id,
352 )
353 }),
354 );
355 #[expect(deprecated)]
356 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
357 monotonic_tombstone_event.encode(&mut buf);
358 }
359 buf.put_u64_le(self.meta_offset);
360
361 let end = buf.as_ref().len();
362
363 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
364 buf.put_u64_le(checksum);
365 buf.put_u32_le(VERSION);
366 buf.put_u32_le(MAGIC);
367 }
368
369 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
370 let mut cursor = buf.len();
371
372 cursor -= 4;
373 let magic = (&buf[cursor..]).get_u32_le();
374 if magic != MAGIC {
375 return Err(HummockError::magic_mismatch(MAGIC, magic));
376 }
377
378 cursor -= 4;
379 let version = (&buf[cursor..cursor + 4]).get_u32_le();
380 if version != VERSION && version != OLD_VERSION {
381 return Err(HummockError::invalid_format_version(version));
382 }
383
384 cursor -= 8;
385 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
386 let buf = &mut &buf[..cursor];
387 xxhash64_verify(buf, checksum)?;
388
389 let block_meta_count = buf.get_u32_le() as usize;
390 let mut block_metas = Vec::with_capacity(block_meta_count);
391 if version == OLD_VERSION {
392 for _ in 0..block_meta_count {
393 block_metas.push(BlockMeta::decode_from_v1(buf));
394 }
395 } else {
396 for _ in 0..block_meta_count {
397 block_metas.push(BlockMeta::decode(buf));
398 }
399 }
400
401 let bloom_filter = get_length_prefixed_slice(buf);
402 let estimated_size = buf.get_u32_le();
403 let key_count = buf.get_u32_le();
404 let smallest_key = get_length_prefixed_slice(buf);
405 let largest_key = get_length_prefixed_slice(buf);
406 let tomb_event_count = buf.get_u32_le() as usize;
407 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
408 for _ in 0..tomb_event_count {
409 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
410 monotonic_tombstone_events.push(monotonic_tombstone_event);
411 }
412 let meta_offset = buf.get_u64_le();
413
414 if !monotonic_tombstone_events.is_empty() {
415 warn!(
416 count = monotonic_tombstone_events.len(),
417 tables = ?monotonic_tombstone_events
418 .iter()
419 .map(|event| event.event_key.left_user_key.table_id)
420 .collect::<HashSet<_>>(),
421 "read non-empty range tombstones");
422 }
423
424 #[expect(deprecated)]
425 Ok(Self {
426 block_metas,
427 bloom_filter,
428 estimated_size,
429 key_count,
430 smallest_key,
431 largest_key,
432 meta_offset,
433 monotonic_tombstone_events,
434 version,
435 })
436 }
437
438 #[inline]
439 pub fn encoded_size(&self) -> usize {
440 4 + self
442 .block_metas
443 .iter()
444 .map(|block_meta| block_meta.encoded_size())
445 .sum::<usize>()
446 + 4 + 4 + self.bloom_filter.len()
449 + 4 + 4 + 4 + self.smallest_key.len()
453 + 4 + self.largest_key.len()
455 + 8 + 8 + 4 + 4 }
460}
461
462#[derive(Default)]
463pub struct SstableIteratorReadOptions {
464 pub cache_policy: CachePolicy,
465 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
466 pub max_preload_retry_times: usize,
467 pub prefetch_for_large_query: bool,
468}
469
470impl SstableIteratorReadOptions {
471 pub fn from_read_options(read_options: &ReadOptions) -> Self {
472 Self {
473 cache_policy: read_options.cache_policy,
474 must_iterated_end_user_key: None,
475 max_preload_retry_times: 0,
476 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
477 }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::hummock::HummockValue;
485 use crate::hummock::iterator::test_utils::{
486 default_builder_opt_for_test, iterator_test_key_of,
487 };
488 use crate::hummock::test_utils::gen_test_sstable_data;
489
490 #[test]
491 fn test_sstable_meta_enc_dec() {
492 #[expect(deprecated)]
493 let meta = SstableMeta {
494 block_metas: vec![
495 BlockMeta {
496 smallest_key: b"0-smallest-key".to_vec(),
497 len: 100,
498 ..Default::default()
499 },
500 BlockMeta {
501 smallest_key: b"5-some-key".to_vec(),
502 offset: 100,
503 len: 100,
504 ..Default::default()
505 },
506 ],
507 bloom_filter: b"0123456789".to_vec(),
508 estimated_size: 123,
509 key_count: 123,
510 smallest_key: b"0-smallest-key".to_vec(),
511 largest_key: b"9-largest-key".to_vec(),
512 meta_offset: 123,
513 monotonic_tombstone_events: vec![],
514 version: VERSION,
515 };
516 let sz = meta.encoded_size();
517 let buf = meta.encode_to_bytes();
518 assert_eq!(sz, buf.len());
519 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
520 assert_eq!(decoded_meta, meta);
521
522 println!("buf: {}", buf.len());
523 }
524
525 #[tokio::test]
526 async fn test_sstable_serde() {
527 let (_, meta) = gen_test_sstable_data(
528 default_builder_opt_for_test(),
529 (0..100).clone().map(|x| {
530 (
531 iterator_test_key_of(x),
532 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
533 )
534 }),
535 )
536 .await;
537
538 let buffer = bincode::serialize(&meta).unwrap();
539
540 let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
541
542 assert_eq!(meta, m);
543
544 println!("{} vs {}", buffer.len(), meta.encoded_size());
545 }
546}