1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32 BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use super::{HummockError, HummockResult};
58use crate::hummock::CachePolicy;
59use crate::store::ReadOptions;
60
61const MAGIC: u32 = 0x5785ab73;
62const OLD_VERSION: u32 = 1;
63const VERSION: u32 = 2;
64
65#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
79pub struct MonotonicDeleteEvent {
80 pub event_key:
81 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
82 pub new_epoch: HummockEpoch,
83}
84
85impl MonotonicDeleteEvent {
86 pub fn encode(&self, mut buf: impl BufMut) {
87 self.event_key
88 .left_user_key
89 .encode_length_prefixed(&mut buf);
90 buf.put_u8(if self.event_key.is_exclude_left_key {
91 1
92 } else {
93 0
94 });
95 buf.put_u64_le(self.new_epoch);
96 }
97
98 pub fn decode(buf: &mut &[u8]) -> Self {
99 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
100 let user_key = UserKey::decode_length_prefixed(buf);
101 let exclude_left_key_flag = buf.get_u8();
102 let is_exclude_left_key = match exclude_left_key_flag {
103 0 => false,
104 1 => true,
105 _ => panic!("exclusive flag should be either 0 or 1"),
106 };
107 let new_epoch = buf.get_u64_le();
108 Self {
109 event_key: PointRange {
110 left_user_key: user_key,
111 is_exclude_left_key,
112 },
113 new_epoch,
114 }
115 }
116}
117
118#[derive(Serialize, Deserialize)]
119struct SerdeSstable {
120 id: HummockSstableObjectId,
121 meta: SstableMeta,
122}
123
124impl From<SerdeSstable> for Sstable {
125 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
126 Sstable::new(id, meta, false)
129 }
130}
131
132#[derive(Clone, Deserialize)]
134#[serde(from = "SerdeSstable")]
135pub struct Sstable {
136 pub id: HummockSstableObjectId,
137 pub meta: SstableMeta,
138 #[serde(skip)]
139 pub filter_reader: XorFilterReader,
140 #[serde(skip)]
144 skip_bloom_filter_in_serde: bool,
145}
146
147impl Serialize for Sstable {
148 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
149 where
150 S: serde::Serializer,
151 {
152 let mut serde_sstable = SerdeSstable {
153 id: self.id,
154 meta: self.meta.clone(),
155 };
156 if !self.skip_bloom_filter_in_serde {
157 serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
158 }
159 serde_sstable.serialize(serializer)
160 }
161}
162
163impl Debug for Sstable {
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("Sstable")
166 .field("id", &self.id)
167 .field("meta", &self.meta)
168 .finish()
169 }
170}
171
172impl Sstable {
173 pub fn new(
174 id: HummockSstableObjectId,
175 mut meta: SstableMeta,
176 skip_bloom_filter_in_serde: bool,
177 ) -> Self {
178 let filter_data = std::mem::take(&mut meta.bloom_filter);
179 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
180 Self {
181 id,
182 meta,
183 filter_reader,
184 skip_bloom_filter_in_serde,
185 }
186 }
187
188 #[inline(always)]
189 pub fn has_bloom_filter(&self) -> bool {
190 !self.filter_reader.is_empty()
191 }
192
193 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
194 let block_meta = &self.meta.block_metas[block_index];
195 let range =
196 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
197 let uncompressed_capacity = block_meta.uncompressed_size as usize;
198 (range, uncompressed_capacity)
199 }
200
201 #[inline(always)]
202 pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
203 let dist_key_hash = xxh32::xxh32(dist_key, 0);
204 table_id.bitxor(dist_key_hash)
206 }
207
208 #[inline(always)]
209 pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
210 let dist_key_hash = xxh64::xxh64(dist_key, 0);
211 (table_id as u64).bitxor(dist_key_hash)
213 }
214
215 #[inline(always)]
216 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
217 self.filter_reader.may_match(user_key_range, hash)
218 }
219
220 #[inline(always)]
221 pub fn block_count(&self) -> usize {
222 self.meta.block_metas.len()
223 }
224
225 #[inline(always)]
226 pub fn estimate_size(&self) -> usize {
227 8 + self.filter_reader.estimate_size() + self.meta.encoded_size()
228 }
229}
230
231#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
232pub struct BlockMeta {
233 pub smallest_key: Vec<u8>,
234 pub offset: u32,
235 pub len: u32,
236 pub uncompressed_size: u32,
237 pub total_key_count: u32,
238 pub stale_key_count: u32,
239}
240
241impl BlockMeta {
242 pub fn encode(&self, mut buf: impl BufMut) {
248 buf.put_u32_le(self.offset);
249 buf.put_u32_le(self.len);
250 buf.put_u32_le(self.uncompressed_size);
251 buf.put_u32_le(self.total_key_count);
252 buf.put_u32_le(self.stale_key_count);
253 put_length_prefixed_slice(buf, &self.smallest_key);
254 }
255
256 pub fn decode(buf: &mut &[u8]) -> Self {
257 let offset = buf.get_u32_le();
258 let len = buf.get_u32_le();
259 let uncompressed_size = buf.get_u32_le();
260
261 let total_key_count = buf.get_u32_le();
262 let stale_key_count = buf.get_u32_le();
263 let smallest_key = get_length_prefixed_slice(buf);
264 Self {
265 smallest_key,
266 offset,
267 len,
268 uncompressed_size,
269 total_key_count,
270 stale_key_count,
271 }
272 }
273
274 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
275 let offset = buf.get_u32_le();
276 let len = buf.get_u32_le();
277 let uncompressed_size = buf.get_u32_le();
278 let total_key_count = 0;
279 let stale_key_count = 0;
280 let smallest_key = get_length_prefixed_slice(buf);
281 Self {
282 smallest_key,
283 offset,
284 len,
285 uncompressed_size,
286 total_key_count,
287 stale_key_count,
288 }
289 }
290
291 #[inline]
292 pub fn encoded_size(&self) -> usize {
293 24 + self.smallest_key.len()
294 }
295
296 pub fn table_id(&self) -> TableId {
297 FullKey::decode(&self.smallest_key).user_key.table_id
298 }
299}
300
301#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
302pub struct SstableMeta {
303 pub block_metas: Vec<BlockMeta>,
304 pub bloom_filter: Vec<u8>,
305 pub estimated_size: u32,
306 pub key_count: u32,
307 pub smallest_key: Vec<u8>,
308 pub largest_key: Vec<u8>,
309 pub meta_offset: u64,
310 #[deprecated]
324 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
325 pub version: u32,
327}
328
329impl SstableMeta {
330 pub fn encode_to_bytes(&self) -> Vec<u8> {
345 let encoded_size = self.encoded_size();
346 let mut buf = Vec::with_capacity(encoded_size);
347 self.encode_to(&mut buf);
348 buf
349 }
350
351 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
352 let start = buf.as_ref().len();
353
354 buf.put_u32_le(
355 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
356 let tmp_full_key = FullKey::decode(&self.smallest_key);
357 panic!(
358 "WARN overflow can't convert block_metas_len {} into u32 table {}",
359 self.block_metas.len(),
360 tmp_full_key.user_key.table_id,
361 )
362 }),
363 );
364 for block_meta in &self.block_metas {
365 block_meta.encode(&mut buf);
366 }
367 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
368 buf.put_u32_le(self.estimated_size);
369 buf.put_u32_le(self.key_count);
370 put_length_prefixed_slice(&mut buf, &self.smallest_key);
371 put_length_prefixed_slice(&mut buf, &self.largest_key);
372 #[expect(deprecated)]
373 buf.put_u32_le(
374 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
375 let tmp_full_key = FullKey::decode(&self.smallest_key);
376 panic!(
377 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
378 self.monotonic_tombstone_events.len(),
379 tmp_full_key.user_key.table_id,
380 )
381 }),
382 );
383 #[expect(deprecated)]
384 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
385 monotonic_tombstone_event.encode(&mut buf);
386 }
387 buf.put_u64_le(self.meta_offset);
388
389 let end = buf.as_ref().len();
390
391 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
392 buf.put_u64_le(checksum);
393 buf.put_u32_le(VERSION);
394 buf.put_u32_le(MAGIC);
395 }
396
397 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
398 let mut cursor = buf.len();
399
400 cursor -= 4;
401 let magic = (&buf[cursor..]).get_u32_le();
402 if magic != MAGIC {
403 return Err(HummockError::magic_mismatch(MAGIC, magic));
404 }
405
406 cursor -= 4;
407 let version = (&buf[cursor..cursor + 4]).get_u32_le();
408 if version != VERSION && version != OLD_VERSION {
409 return Err(HummockError::invalid_format_version(version));
410 }
411
412 cursor -= 8;
413 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
414 let buf = &mut &buf[..cursor];
415 xxhash64_verify(buf, checksum)?;
416
417 let block_meta_count = buf.get_u32_le() as usize;
418 let mut block_metas = Vec::with_capacity(block_meta_count);
419 if version == OLD_VERSION {
420 for _ in 0..block_meta_count {
421 block_metas.push(BlockMeta::decode_from_v1(buf));
422 }
423 } else {
424 for _ in 0..block_meta_count {
425 block_metas.push(BlockMeta::decode(buf));
426 }
427 }
428
429 let bloom_filter = get_length_prefixed_slice(buf);
430 let estimated_size = buf.get_u32_le();
431 let key_count = buf.get_u32_le();
432 let smallest_key = get_length_prefixed_slice(buf);
433 let largest_key = get_length_prefixed_slice(buf);
434 let tomb_event_count = buf.get_u32_le() as usize;
435 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
436 for _ in 0..tomb_event_count {
437 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
438 monotonic_tombstone_events.push(monotonic_tombstone_event);
439 }
440 let meta_offset = buf.get_u64_le();
441
442 if !monotonic_tombstone_events.is_empty() {
443 warn!(
444 count = monotonic_tombstone_events.len(),
445 tables = ?monotonic_tombstone_events
446 .iter()
447 .map(|event| event.event_key.left_user_key.table_id)
448 .collect::<HashSet<_>>(),
449 "read non-empty range tombstones");
450 }
451
452 #[expect(deprecated)]
453 Ok(Self {
454 block_metas,
455 bloom_filter,
456 estimated_size,
457 key_count,
458 smallest_key,
459 largest_key,
460 meta_offset,
461 monotonic_tombstone_events,
462 version,
463 })
464 }
465
466 #[inline]
467 pub fn encoded_size(&self) -> usize {
468 4 + self
470 .block_metas
471 .iter()
472 .map(|block_meta| block_meta.encoded_size())
473 .sum::<usize>()
474 + 4 + 4 + self.bloom_filter.len()
477 + 4 + 4 + 4 + self.smallest_key.len()
481 + 4 + self.largest_key.len()
483 + 8 + 8 + 4 + 4 }
488}
489
490#[derive(Default)]
491pub struct SstableIteratorReadOptions {
492 pub cache_policy: CachePolicy,
493 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
494 pub max_preload_retry_times: usize,
495 pub prefetch_for_large_query: bool,
496}
497
498impl SstableIteratorReadOptions {
499 pub fn from_read_options(read_options: &ReadOptions) -> Self {
500 Self {
501 cache_policy: read_options.cache_policy,
502 must_iterated_end_user_key: None,
503 max_preload_retry_times: 0,
504 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
505 }
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use crate::hummock::HummockValue;
513 use crate::hummock::iterator::test_utils::{
514 default_builder_opt_for_test, iterator_test_key_of,
515 };
516 use crate::hummock::test_utils::gen_test_sstable_data;
517
518 #[test]
519 fn test_sstable_meta_enc_dec() {
520 #[expect(deprecated)]
521 let meta = SstableMeta {
522 block_metas: vec![
523 BlockMeta {
524 smallest_key: b"0-smallest-key".to_vec(),
525 len: 100,
526 ..Default::default()
527 },
528 BlockMeta {
529 smallest_key: b"5-some-key".to_vec(),
530 offset: 100,
531 len: 100,
532 ..Default::default()
533 },
534 ],
535 bloom_filter: b"0123456789".to_vec(),
536 estimated_size: 123,
537 key_count: 123,
538 smallest_key: b"0-smallest-key".to_vec(),
539 largest_key: b"9-largest-key".to_vec(),
540 meta_offset: 123,
541 monotonic_tombstone_events: vec![],
542 version: VERSION,
543 };
544 let sz = meta.encoded_size();
545 let buf = meta.encode_to_bytes();
546 assert_eq!(sz, buf.len());
547 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
548 assert_eq!(decoded_meta, meta);
549
550 println!("buf: {}", buf.len());
551 }
552
553 #[tokio::test]
554 async fn test_sstable_serde() {
555 let (_, meta) = gen_test_sstable_data(
556 default_builder_opt_for_test(),
557 (0..100).clone().map(|x| {
558 (
559 iterator_test_key_of(x),
560 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
561 )
562 }),
563 )
564 .await;
565
566 let sstable = Sstable::new(42.into(), meta.clone(), true);
568
569 let buffer = bincode::serialize(&sstable).unwrap();
570
571 let s: Sstable = bincode::deserialize(&buffer).unwrap();
572
573 assert_eq!(s.id, sstable.id);
574 assert_eq!(s.meta, sstable.meta);
575 assert!(!sstable.filter_reader.is_empty());
576 assert!(s.filter_reader.is_empty());
578
579 let sstable = Sstable::new(42.into(), meta, false);
581
582 let buffer = bincode::serialize(&sstable).unwrap();
583
584 let s: Sstable = bincode::deserialize(&buffer).unwrap();
585
586 assert_eq!(s.id, sstable.id);
587 assert_eq!(s.meta, sstable.meta);
588 assert_eq!(
589 s.filter_reader.encode_to_bytes(),
590 sstable.filter_reader.encode_to_bytes()
591 );
592 }
593}