1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod xor_filter;
28use serde::{Deserialize, Serialize};
29pub use xor_filter::{
30 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder,
31 XorFilterReader,
32};
33pub mod builder;
34pub use builder::*;
35pub mod writer;
36use risingwave_common::catalog::TableId;
37pub use writer::*;
38mod forward_sstable_iterator;
39pub mod multi_builder;
40use bytes::{Buf, BufMut};
41pub use forward_sstable_iterator::*;
42use tracing::warn;
43mod backward_sstable_iterator;
44pub use backward_sstable_iterator::*;
45use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
46use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
47
48mod filter;
49mod utils;
50
51pub use filter::{DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions};
52pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
53use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
54use xxhash_rust::xxh64;
55
56use super::{HummockError, HummockResult};
57use crate::hummock::CachePolicy;
58use crate::store::ReadOptions;
59
60const MAGIC: u32 = 0x5785ab73;
61const OLD_VERSION: u32 = 1;
62const VERSION: u32 = 2;
63
64#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
78pub struct MonotonicDeleteEvent {
79 pub event_key:
80 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
81 pub new_epoch: HummockEpoch,
82}
83
84impl MonotonicDeleteEvent {
85 pub fn encode(&self, mut buf: impl BufMut) {
86 self.event_key
87 .left_user_key
88 .encode_length_prefixed(&mut buf);
89 buf.put_u8(if self.event_key.is_exclude_left_key {
90 1
91 } else {
92 0
93 });
94 buf.put_u64_le(self.new_epoch);
95 }
96
97 pub fn decode(buf: &mut &[u8]) -> Self {
98 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
99 let user_key = UserKey::decode_length_prefixed(buf);
100 let exclude_left_key_flag = buf.get_u8();
101 let is_exclude_left_key = match exclude_left_key_flag {
102 0 => false,
103 1 => true,
104 _ => panic!("exclusive flag should be either 0 or 1"),
105 };
106 let new_epoch = buf.get_u64_le();
107 Self {
108 event_key: PointRange {
109 left_user_key: user_key,
110 is_exclude_left_key,
111 },
112 new_epoch,
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize)]
118struct SerdeSstable {
119 id: HummockSstableObjectId,
120 meta: SstableMeta,
121}
122
123impl From<SerdeSstable> for Sstable {
124 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
125 Sstable::new(id, meta, false)
128 }
129}
130
131#[derive(Clone, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135 pub id: HummockSstableObjectId,
136 pub meta: SstableMeta,
137 #[serde(skip)]
138 pub filter_reader: XorFilterReader,
139 #[serde(skip)]
143 skip_bloom_filter_in_serde: bool,
144}
145
146impl Serialize for Sstable {
147 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
148 where
149 S: serde::Serializer,
150 {
151 let mut serde_sstable = SerdeSstable {
152 id: self.id,
153 meta: self.meta.clone(),
154 };
155 if !self.skip_bloom_filter_in_serde {
156 serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
157 }
158 serde_sstable.serialize(serializer)
159 }
160}
161
162impl Debug for Sstable {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("Sstable")
165 .field("id", &self.id)
166 .field("meta", &self.meta)
167 .finish()
168 }
169}
170
171impl Sstable {
172 pub fn new(
173 id: HummockSstableObjectId,
174 mut meta: SstableMeta,
175 skip_bloom_filter_in_serde: bool,
176 ) -> Self {
177 let filter_data = std::mem::take(&mut meta.bloom_filter);
178 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
179 Self {
180 id,
181 meta,
182 filter_reader,
183 skip_bloom_filter_in_serde,
184 }
185 }
186
187 #[inline(always)]
188 pub fn has_filter(&self) -> bool {
189 !self.filter_reader.is_empty()
190 }
191
192 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
193 let block_meta = &self.meta.block_metas[block_index];
194 let range =
195 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
196 let uncompressed_capacity = block_meta.uncompressed_size as usize;
197 (range, uncompressed_capacity)
198 }
199
200 #[inline(always)]
201 pub fn hash_for_filter(dist_key: &[u8], table_id: u32) -> u64 {
202 let dist_key_hash = xxh64::xxh64(dist_key, 0);
203 (table_id as u64).bitxor(dist_key_hash)
205 }
206
207 #[inline(always)]
208 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
209 self.filter_reader
210 .may_match(&self.meta.block_metas, user_key_range, hash)
211 }
212
213 #[inline(always)]
214 pub fn block_count(&self) -> usize {
215 self.meta.block_metas.len()
216 }
217
218 #[inline(always)]
219 pub fn estimated_meta_cache_memory_weight(&self) -> usize {
220 std::mem::size_of::<Self>()
223 + self.meta.estimated_heap_size()
224 + self.filter_reader.estimated_heap_size()
225 }
226}
227
228#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
229pub struct BlockMeta {
230 pub smallest_key: Vec<u8>,
231 pub offset: u32,
232 pub len: u32,
233 pub uncompressed_size: u32,
234 pub total_key_count: u32,
235 pub stale_key_count: u32,
236}
237
238impl BlockMeta {
239 pub fn encode(&self, mut buf: impl BufMut) {
245 buf.put_u32_le(self.offset);
246 buf.put_u32_le(self.len);
247 buf.put_u32_le(self.uncompressed_size);
248 buf.put_u32_le(self.total_key_count);
249 buf.put_u32_le(self.stale_key_count);
250 put_length_prefixed_slice(buf, &self.smallest_key);
251 }
252
253 pub fn decode(buf: &mut &[u8]) -> Self {
254 let offset = buf.get_u32_le();
255 let len = buf.get_u32_le();
256 let uncompressed_size = buf.get_u32_le();
257
258 let total_key_count = buf.get_u32_le();
259 let stale_key_count = buf.get_u32_le();
260 let smallest_key = get_length_prefixed_slice(buf);
261 Self {
262 smallest_key,
263 offset,
264 len,
265 uncompressed_size,
266 total_key_count,
267 stale_key_count,
268 }
269 }
270
271 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
272 let offset = buf.get_u32_le();
273 let len = buf.get_u32_le();
274 let uncompressed_size = buf.get_u32_le();
275 let total_key_count = 0;
276 let stale_key_count = 0;
277 let smallest_key = get_length_prefixed_slice(buf);
278 Self {
279 smallest_key,
280 offset,
281 len,
282 uncompressed_size,
283 total_key_count,
284 stale_key_count,
285 }
286 }
287
288 #[inline]
289 pub fn encoded_size(&self) -> usize {
290 24 + self.smallest_key.len()
291 }
292
293 pub fn table_id(&self) -> TableId {
294 FullKey::decode(&self.smallest_key).user_key.table_id
295 }
296
297 fn estimated_heap_size(&self) -> usize {
298 self.smallest_key.capacity()
299 }
300}
301
302#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
303pub struct SstableMeta {
304 pub block_metas: Vec<BlockMeta>,
305 pub bloom_filter: Vec<u8>,
306 pub estimated_size: u32,
307 pub key_count: u32,
308 pub smallest_key: Vec<u8>,
309 pub largest_key: Vec<u8>,
310 pub meta_offset: u64,
311 #[deprecated]
325 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
326 pub version: u32,
328}
329
330impl SstableMeta {
331 pub fn encode_to_bytes(&self) -> Vec<u8> {
346 let encoded_size = self.encoded_size();
347 let mut buf = Vec::with_capacity(encoded_size);
348 self.encode_to(&mut buf);
349 buf
350 }
351
352 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
353 let start = buf.as_ref().len();
354
355 buf.put_u32_le(
356 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
357 let tmp_full_key = FullKey::decode(&self.smallest_key);
358 panic!(
359 "WARN overflow can't convert block_metas_len {} into u32 table {}",
360 self.block_metas.len(),
361 tmp_full_key.user_key.table_id,
362 )
363 }),
364 );
365 for block_meta in &self.block_metas {
366 block_meta.encode(&mut buf);
367 }
368 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
369 buf.put_u32_le(self.estimated_size);
370 buf.put_u32_le(self.key_count);
371 put_length_prefixed_slice(&mut buf, &self.smallest_key);
372 put_length_prefixed_slice(&mut buf, &self.largest_key);
373 #[expect(deprecated)]
374 buf.put_u32_le(
375 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
376 let tmp_full_key = FullKey::decode(&self.smallest_key);
377 panic!(
378 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
379 self.monotonic_tombstone_events.len(),
380 tmp_full_key.user_key.table_id,
381 )
382 }),
383 );
384 #[expect(deprecated)]
385 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
386 monotonic_tombstone_event.encode(&mut buf);
387 }
388 buf.put_u64_le(self.meta_offset);
389
390 let end = buf.as_ref().len();
391
392 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
393 buf.put_u64_le(checksum);
394 buf.put_u32_le(VERSION);
395 buf.put_u32_le(MAGIC);
396 }
397
398 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
399 let mut cursor = buf.len();
400
401 cursor -= 4;
402 let magic = (&buf[cursor..]).get_u32_le();
403 if magic != MAGIC {
404 return Err(HummockError::magic_mismatch(MAGIC, magic));
405 }
406
407 cursor -= 4;
408 let version = (&buf[cursor..cursor + 4]).get_u32_le();
409 if version != VERSION && version != OLD_VERSION {
410 return Err(HummockError::invalid_format_version(version));
411 }
412
413 cursor -= 8;
414 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
415 let buf = &mut &buf[..cursor];
416 xxhash64_verify(buf, checksum)?;
417
418 let block_meta_count = buf.get_u32_le() as usize;
419 let mut block_metas = Vec::with_capacity(block_meta_count);
420 if version == OLD_VERSION {
421 for _ in 0..block_meta_count {
422 block_metas.push(BlockMeta::decode_from_v1(buf));
423 }
424 } else {
425 for _ in 0..block_meta_count {
426 block_metas.push(BlockMeta::decode(buf));
427 }
428 }
429
430 let bloom_filter = get_length_prefixed_slice(buf);
431 let estimated_size = buf.get_u32_le();
432 let key_count = buf.get_u32_le();
433 let smallest_key = get_length_prefixed_slice(buf);
434 let largest_key = get_length_prefixed_slice(buf);
435 let tomb_event_count = buf.get_u32_le() as usize;
436 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
437 for _ in 0..tomb_event_count {
438 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
439 monotonic_tombstone_events.push(monotonic_tombstone_event);
440 }
441 let meta_offset = buf.get_u64_le();
442
443 if !monotonic_tombstone_events.is_empty() {
444 warn!(
445 count = monotonic_tombstone_events.len(),
446 tables = ?monotonic_tombstone_events
447 .iter()
448 .map(|event| event.event_key.left_user_key.table_id)
449 .collect::<HashSet<_>>(),
450 "read non-empty range tombstones");
451 }
452
453 #[expect(deprecated)]
454 Ok(Self {
455 block_metas,
456 bloom_filter,
457 estimated_size,
458 key_count,
459 smallest_key,
460 largest_key,
461 meta_offset,
462 monotonic_tombstone_events,
463 version,
464 })
465 }
466
467 #[inline]
468 pub fn encoded_size(&self) -> usize {
469 4 + self
471 .block_metas
472 .iter()
473 .map(|block_meta| block_meta.encoded_size())
474 .sum::<usize>()
475 + 4 + 4 + self.bloom_filter.len()
478 + 4 + 4 + 4 + self.smallest_key.len()
482 + 4 + self.largest_key.len()
484 + 8 + 8 + 4 + 4 }
489
490 #[expect(
491 deprecated,
492 reason = "monotonic_tombstone_events is deprecated but still contributes to decoded meta heap size"
493 )]
494 fn estimated_heap_size(&self) -> usize {
495 self.block_metas.capacity() * std::mem::size_of::<BlockMeta>()
496 + self
497 .block_metas
498 .iter()
499 .map(BlockMeta::estimated_heap_size)
500 .sum::<usize>()
501 + self.bloom_filter.capacity()
502 + self.smallest_key.capacity()
503 + self.largest_key.capacity()
504 + self.monotonic_tombstone_events.capacity()
505 * std::mem::size_of::<MonotonicDeleteEvent>()
506 }
507}
508
509#[derive(Default)]
510pub struct SstableIteratorReadOptions {
511 pub cache_policy: CachePolicy,
512 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
513 pub max_preload_retry_times: usize,
514 pub prefetch_for_large_query: bool,
515}
516
517impl SstableIteratorReadOptions {
518 pub fn from_read_options(read_options: &ReadOptions) -> Self {
519 Self {
520 cache_policy: read_options.cache_policy,
521 must_iterated_end_user_key: None,
522 max_preload_retry_times: 0,
523 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::hummock::HummockValue;
532 use crate::hummock::iterator::test_utils::{
533 default_builder_opt_for_test, iterator_test_key_of,
534 };
535 use crate::hummock::test_utils::gen_test_sstable_data;
536
537 #[test]
538 fn test_sstable_meta_enc_dec() {
539 #[expect(deprecated)]
540 let meta = SstableMeta {
541 block_metas: vec![
542 BlockMeta {
543 smallest_key: b"0-smallest-key".to_vec(),
544 len: 100,
545 ..Default::default()
546 },
547 BlockMeta {
548 smallest_key: b"5-some-key".to_vec(),
549 offset: 100,
550 len: 100,
551 ..Default::default()
552 },
553 ],
554 bloom_filter: b"0123456789".to_vec(),
555 estimated_size: 123,
556 key_count: 123,
557 smallest_key: b"0-smallest-key".to_vec(),
558 largest_key: b"9-largest-key".to_vec(),
559 meta_offset: 123,
560 monotonic_tombstone_events: vec![],
561 version: VERSION,
562 };
563 let sz = meta.encoded_size();
564 let buf = meta.encode_to_bytes();
565 assert_eq!(sz, buf.len());
566 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
567 assert_eq!(decoded_meta, meta);
568
569 println!("buf: {}", buf.len());
570 }
571
572 #[tokio::test]
573 async fn test_sstable_serde() {
574 let (_, meta) = gen_test_sstable_data(
575 default_builder_opt_for_test(),
576 (0..100).clone().map(|x| {
577 (
578 iterator_test_key_of(x),
579 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
580 )
581 }),
582 )
583 .await;
584
585 let sstable = Sstable::new(42.into(), meta.clone(), true);
587
588 let buffer = bincode::serialize(&sstable).unwrap();
589
590 let s: Sstable = bincode::deserialize(&buffer).unwrap();
591
592 assert_eq!(s.id, sstable.id);
593 assert_eq!(s.meta, sstable.meta);
594 assert!(!sstable.filter_reader.is_empty());
595 assert!(s.filter_reader.is_empty());
597
598 let sstable = Sstable::new(42.into(), meta, false);
600
601 let buffer = bincode::serialize(&sstable).unwrap();
602
603 let s: Sstable = bincode::deserialize(&buffer).unwrap();
604
605 assert_eq!(s.id, sstable.id);
606 assert_eq!(s.meta, sstable.meta);
607 assert_eq!(
608 s.filter_reader.encode_to_bytes(),
609 sstable.filter_reader.encode_to_bytes()
610 );
611 }
612}