1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod xor_filter;
28use serde::{Deserialize, Serialize};
29pub use xor_filter::{
30 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder,
31 XorFilterReader,
32};
33pub mod builder;
34pub use builder::*;
35pub mod writer;
36use risingwave_common::catalog::TableId;
37pub use writer::*;
38mod forward_sstable_iterator;
39pub mod multi_builder;
40use bytes::{Buf, BufMut};
41pub use forward_sstable_iterator::*;
42use tracing::warn;
43mod backward_sstable_iterator;
44pub use backward_sstable_iterator::*;
45use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
46use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
47
48mod filter;
49mod utils;
50
51pub use filter::FilterBuilder;
52pub use utils::{CompressionAlgorithm, xxhash64_checksum, xxhash64_verify};
53use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
54use xxhash_rust::xxh64;
55
56use super::{HummockError, HummockResult};
57use crate::hummock::CachePolicy;
58use crate::store::ReadOptions;
59
60const MAGIC: u32 = 0x5785ab73;
61const OLD_VERSION: u32 = 1;
62const VERSION: u32 = 2;
63
64#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
78pub struct MonotonicDeleteEvent {
79 pub event_key:
80 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
81 pub new_epoch: HummockEpoch,
82}
83
84impl MonotonicDeleteEvent {
85 pub fn encode(&self, mut buf: impl BufMut) {
86 self.event_key
87 .left_user_key
88 .encode_length_prefixed(&mut buf);
89 buf.put_u8(if self.event_key.is_exclude_left_key {
90 1
91 } else {
92 0
93 });
94 buf.put_u64_le(self.new_epoch);
95 }
96
97 pub fn decode(buf: &mut &[u8]) -> Self {
98 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
99 let user_key = UserKey::decode_length_prefixed(buf);
100 let exclude_left_key_flag = buf.get_u8();
101 let is_exclude_left_key = match exclude_left_key_flag {
102 0 => false,
103 1 => true,
104 _ => panic!("exclusive flag should be either 0 or 1"),
105 };
106 let new_epoch = buf.get_u64_le();
107 Self {
108 event_key: PointRange {
109 left_user_key: user_key,
110 is_exclude_left_key,
111 },
112 new_epoch,
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize)]
118struct SerdeSstable {
119 id: HummockSstableObjectId,
120 meta: SstableMeta,
121}
122
123impl From<SerdeSstable> for Sstable {
124 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
125 Sstable::new(id, meta, false)
128 }
129}
130
131#[derive(Clone, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135 pub id: HummockSstableObjectId,
136 pub meta: SstableMeta,
137 #[serde(skip)]
138 pub filter_reader: XorFilterReader,
139 #[serde(skip)]
143 skip_bloom_filter_in_serde: bool,
144}
145
146impl Serialize for Sstable {
147 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
148 where
149 S: serde::Serializer,
150 {
151 let mut serde_sstable = SerdeSstable {
152 id: self.id,
153 meta: self.meta.clone(),
154 };
155 if !self.skip_bloom_filter_in_serde {
156 serde_sstable.meta.bloom_filter = self.filter_reader.encode_to_bytes();
157 }
158 serde_sstable.serialize(serializer)
159 }
160}
161
162impl Debug for Sstable {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("Sstable")
165 .field("id", &self.id)
166 .field("meta", &self.meta)
167 .finish()
168 }
169}
170
171impl Sstable {
172 pub fn new(
173 id: HummockSstableObjectId,
174 mut meta: SstableMeta,
175 skip_bloom_filter_in_serde: bool,
176 ) -> Self {
177 let filter_data = std::mem::take(&mut meta.bloom_filter);
178 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
179 Self {
180 id,
181 meta,
182 filter_reader,
183 skip_bloom_filter_in_serde,
184 }
185 }
186
187 #[inline(always)]
188 pub fn has_filter(&self) -> bool {
189 !self.filter_reader.is_empty()
190 }
191
192 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
193 let block_meta = &self.meta.block_metas[block_index];
194 let range =
195 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
196 let uncompressed_capacity = block_meta.uncompressed_size as usize;
197 (range, uncompressed_capacity)
198 }
199
200 #[inline(always)]
201 pub fn hash_for_filter(dist_key: &[u8], table_id: u32) -> u64 {
202 let dist_key_hash = xxh64::xxh64(dist_key, 0);
203 (table_id as u64).bitxor(dist_key_hash)
205 }
206
207 #[inline(always)]
208 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
209 self.filter_reader
210 .may_match(&self.meta.block_metas, user_key_range, hash)
211 }
212
213 #[inline(always)]
214 pub fn block_count(&self) -> usize {
215 self.meta.block_metas.len()
216 }
217
218 #[inline(always)]
219 pub fn estimate_size(&self) -> usize {
220 8 + self.filter_reader.estimate_size() + self.meta.encoded_size()
221 }
222}
223
224#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
225pub struct BlockMeta {
226 pub smallest_key: Vec<u8>,
227 pub offset: u32,
228 pub len: u32,
229 pub uncompressed_size: u32,
230 pub total_key_count: u32,
231 pub stale_key_count: u32,
232}
233
234impl BlockMeta {
235 pub fn encode(&self, mut buf: impl BufMut) {
241 buf.put_u32_le(self.offset);
242 buf.put_u32_le(self.len);
243 buf.put_u32_le(self.uncompressed_size);
244 buf.put_u32_le(self.total_key_count);
245 buf.put_u32_le(self.stale_key_count);
246 put_length_prefixed_slice(buf, &self.smallest_key);
247 }
248
249 pub fn decode(buf: &mut &[u8]) -> Self {
250 let offset = buf.get_u32_le();
251 let len = buf.get_u32_le();
252 let uncompressed_size = buf.get_u32_le();
253
254 let total_key_count = buf.get_u32_le();
255 let stale_key_count = buf.get_u32_le();
256 let smallest_key = get_length_prefixed_slice(buf);
257 Self {
258 smallest_key,
259 offset,
260 len,
261 uncompressed_size,
262 total_key_count,
263 stale_key_count,
264 }
265 }
266
267 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
268 let offset = buf.get_u32_le();
269 let len = buf.get_u32_le();
270 let uncompressed_size = buf.get_u32_le();
271 let total_key_count = 0;
272 let stale_key_count = 0;
273 let smallest_key = get_length_prefixed_slice(buf);
274 Self {
275 smallest_key,
276 offset,
277 len,
278 uncompressed_size,
279 total_key_count,
280 stale_key_count,
281 }
282 }
283
284 #[inline]
285 pub fn encoded_size(&self) -> usize {
286 24 + self.smallest_key.len()
287 }
288
289 pub fn table_id(&self) -> TableId {
290 FullKey::decode(&self.smallest_key).user_key.table_id
291 }
292}
293
294#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
295pub struct SstableMeta {
296 pub block_metas: Vec<BlockMeta>,
297 pub bloom_filter: Vec<u8>,
298 pub estimated_size: u32,
299 pub key_count: u32,
300 pub smallest_key: Vec<u8>,
301 pub largest_key: Vec<u8>,
302 pub meta_offset: u64,
303 #[deprecated]
317 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
318 pub version: u32,
320}
321
322impl SstableMeta {
323 pub fn encode_to_bytes(&self) -> Vec<u8> {
338 let encoded_size = self.encoded_size();
339 let mut buf = Vec::with_capacity(encoded_size);
340 self.encode_to(&mut buf);
341 buf
342 }
343
344 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
345 let start = buf.as_ref().len();
346
347 buf.put_u32_le(
348 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
349 let tmp_full_key = FullKey::decode(&self.smallest_key);
350 panic!(
351 "WARN overflow can't convert block_metas_len {} into u32 table {}",
352 self.block_metas.len(),
353 tmp_full_key.user_key.table_id,
354 )
355 }),
356 );
357 for block_meta in &self.block_metas {
358 block_meta.encode(&mut buf);
359 }
360 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
361 buf.put_u32_le(self.estimated_size);
362 buf.put_u32_le(self.key_count);
363 put_length_prefixed_slice(&mut buf, &self.smallest_key);
364 put_length_prefixed_slice(&mut buf, &self.largest_key);
365 #[expect(deprecated)]
366 buf.put_u32_le(
367 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
368 let tmp_full_key = FullKey::decode(&self.smallest_key);
369 panic!(
370 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
371 self.monotonic_tombstone_events.len(),
372 tmp_full_key.user_key.table_id,
373 )
374 }),
375 );
376 #[expect(deprecated)]
377 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
378 monotonic_tombstone_event.encode(&mut buf);
379 }
380 buf.put_u64_le(self.meta_offset);
381
382 let end = buf.as_ref().len();
383
384 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
385 buf.put_u64_le(checksum);
386 buf.put_u32_le(VERSION);
387 buf.put_u32_le(MAGIC);
388 }
389
390 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
391 let mut cursor = buf.len();
392
393 cursor -= 4;
394 let magic = (&buf[cursor..]).get_u32_le();
395 if magic != MAGIC {
396 return Err(HummockError::magic_mismatch(MAGIC, magic));
397 }
398
399 cursor -= 4;
400 let version = (&buf[cursor..cursor + 4]).get_u32_le();
401 if version != VERSION && version != OLD_VERSION {
402 return Err(HummockError::invalid_format_version(version));
403 }
404
405 cursor -= 8;
406 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
407 let buf = &mut &buf[..cursor];
408 xxhash64_verify(buf, checksum)?;
409
410 let block_meta_count = buf.get_u32_le() as usize;
411 let mut block_metas = Vec::with_capacity(block_meta_count);
412 if version == OLD_VERSION {
413 for _ in 0..block_meta_count {
414 block_metas.push(BlockMeta::decode_from_v1(buf));
415 }
416 } else {
417 for _ in 0..block_meta_count {
418 block_metas.push(BlockMeta::decode(buf));
419 }
420 }
421
422 let bloom_filter = get_length_prefixed_slice(buf);
423 let estimated_size = buf.get_u32_le();
424 let key_count = buf.get_u32_le();
425 let smallest_key = get_length_prefixed_slice(buf);
426 let largest_key = get_length_prefixed_slice(buf);
427 let tomb_event_count = buf.get_u32_le() as usize;
428 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
429 for _ in 0..tomb_event_count {
430 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
431 monotonic_tombstone_events.push(monotonic_tombstone_event);
432 }
433 let meta_offset = buf.get_u64_le();
434
435 if !monotonic_tombstone_events.is_empty() {
436 warn!(
437 count = monotonic_tombstone_events.len(),
438 tables = ?monotonic_tombstone_events
439 .iter()
440 .map(|event| event.event_key.left_user_key.table_id)
441 .collect::<HashSet<_>>(),
442 "read non-empty range tombstones");
443 }
444
445 #[expect(deprecated)]
446 Ok(Self {
447 block_metas,
448 bloom_filter,
449 estimated_size,
450 key_count,
451 smallest_key,
452 largest_key,
453 meta_offset,
454 monotonic_tombstone_events,
455 version,
456 })
457 }
458
459 #[inline]
460 pub fn encoded_size(&self) -> usize {
461 4 + self
463 .block_metas
464 .iter()
465 .map(|block_meta| block_meta.encoded_size())
466 .sum::<usize>()
467 + 4 + 4 + self.bloom_filter.len()
470 + 4 + 4 + 4 + self.smallest_key.len()
474 + 4 + self.largest_key.len()
476 + 8 + 8 + 4 + 4 }
481}
482
483#[derive(Default)]
484pub struct SstableIteratorReadOptions {
485 pub cache_policy: CachePolicy,
486 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
487 pub max_preload_retry_times: usize,
488 pub prefetch_for_large_query: bool,
489}
490
491impl SstableIteratorReadOptions {
492 pub fn from_read_options(read_options: &ReadOptions) -> Self {
493 Self {
494 cache_policy: read_options.cache_policy,
495 must_iterated_end_user_key: None,
496 max_preload_retry_times: 0,
497 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
498 }
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::hummock::HummockValue;
506 use crate::hummock::iterator::test_utils::{
507 default_builder_opt_for_test, iterator_test_key_of,
508 };
509 use crate::hummock::test_utils::gen_test_sstable_data;
510
511 #[test]
512 fn test_sstable_meta_enc_dec() {
513 #[expect(deprecated)]
514 let meta = SstableMeta {
515 block_metas: vec![
516 BlockMeta {
517 smallest_key: b"0-smallest-key".to_vec(),
518 len: 100,
519 ..Default::default()
520 },
521 BlockMeta {
522 smallest_key: b"5-some-key".to_vec(),
523 offset: 100,
524 len: 100,
525 ..Default::default()
526 },
527 ],
528 bloom_filter: b"0123456789".to_vec(),
529 estimated_size: 123,
530 key_count: 123,
531 smallest_key: b"0-smallest-key".to_vec(),
532 largest_key: b"9-largest-key".to_vec(),
533 meta_offset: 123,
534 monotonic_tombstone_events: vec![],
535 version: VERSION,
536 };
537 let sz = meta.encoded_size();
538 let buf = meta.encode_to_bytes();
539 assert_eq!(sz, buf.len());
540 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
541 assert_eq!(decoded_meta, meta);
542
543 println!("buf: {}", buf.len());
544 }
545
546 #[tokio::test]
547 async fn test_sstable_serde() {
548 let (_, meta) = gen_test_sstable_data(
549 default_builder_opt_for_test(),
550 (0..100).clone().map(|x| {
551 (
552 iterator_test_key_of(x),
553 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
554 )
555 }),
556 )
557 .await;
558
559 let sstable = Sstable::new(42.into(), meta.clone(), true);
561
562 let buffer = bincode::serialize(&sstable).unwrap();
563
564 let s: Sstable = bincode::deserialize(&buffer).unwrap();
565
566 assert_eq!(s.id, sstable.id);
567 assert_eq!(s.meta, sstable.meta);
568 assert!(!sstable.filter_reader.is_empty());
569 assert!(s.filter_reader.is_empty());
571
572 let sstable = Sstable::new(42.into(), meta, false);
574
575 let buffer = bincode::serialize(&sstable).unwrap();
576
577 let s: Sstable = bincode::deserialize(&buffer).unwrap();
578
579 assert_eq!(s.id, sstable.id);
580 assert_eq!(s.meta, sstable.meta);
581 assert_eq!(
582 s.filter_reader.encode_to_bytes(),
583 sstable.filter_reader.encode_to_bytes()
584 );
585 }
586}