1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32 BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod sstable_object_id_manager;
51mod utils;
52
53pub use filter::FilterBuilder;
54pub use sstable_object_id_manager::*;
55pub use utils::CompressionAlgorithm;
56use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
57use xxhash_rust::{xxh32, xxh64};
58
59use self::utils::{xxhash64_checksum, xxhash64_verify};
60use super::{HummockError, HummockResult};
61use crate::hummock::CachePolicy;
62use crate::store::ReadOptions;
63
64const MAGIC: u32 = 0x5785ab73;
65const OLD_VERSION: u32 = 1;
66const VERSION: u32 = 2;
67
68#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
82pub struct MonotonicDeleteEvent {
83 pub event_key:
84 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
85 pub new_epoch: HummockEpoch,
86}
87
88impl MonotonicDeleteEvent {
89 pub fn encode(&self, mut buf: impl BufMut) {
90 self.event_key
91 .left_user_key
92 .encode_length_prefixed(&mut buf);
93 buf.put_u8(if self.event_key.is_exclude_left_key {
94 1
95 } else {
96 0
97 });
98 buf.put_u64_le(self.new_epoch);
99 }
100
101 pub fn decode(buf: &mut &[u8]) -> Self {
102 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
103 let user_key = UserKey::decode_length_prefixed(buf);
104 let exclude_left_key_flag = buf.get_u8();
105 let is_exclude_left_key = match exclude_left_key_flag {
106 0 => false,
107 1 => true,
108 _ => panic!("exclusive flag should be either 0 or 1"),
109 };
110 let new_epoch = buf.get_u64_le();
111 Self {
112 event_key: PointRange {
113 left_user_key: user_key,
114 is_exclude_left_key,
115 },
116 new_epoch,
117 }
118 }
119}
120
121#[derive(Serialize, Deserialize)]
122struct SerdeSstable {
123 id: HummockSstableObjectId,
124 meta: SstableMeta,
125}
126
127impl From<SerdeSstable> for Sstable {
128 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
129 Sstable::new(id, meta)
130 }
131}
132
133#[derive(Clone, Serialize, Deserialize)]
135#[serde(from = "SerdeSstable")]
136pub struct Sstable {
137 pub id: HummockSstableObjectId,
138 pub meta: SstableMeta,
139 #[serde(skip)]
140 pub filter_reader: XorFilterReader,
141}
142
143impl Debug for Sstable {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("Sstable")
146 .field("id", &self.id)
147 .field("meta", &self.meta)
148 .finish()
149 }
150}
151
152impl Sstable {
153 pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
154 let filter_data = std::mem::take(&mut meta.bloom_filter);
155 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
156 Self {
157 id,
158 meta,
159 filter_reader,
160 }
161 }
162
163 #[inline(always)]
164 pub fn has_bloom_filter(&self) -> bool {
165 !self.filter_reader.is_empty()
166 }
167
168 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
169 let block_meta = &self.meta.block_metas[block_index];
170 let range =
171 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
172 let uncompressed_capacity = block_meta.uncompressed_size as usize;
173 (range, uncompressed_capacity)
174 }
175
176 #[inline(always)]
177 pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
178 let dist_key_hash = xxh32::xxh32(dist_key, 0);
179 table_id.bitxor(dist_key_hash)
181 }
182
183 #[inline(always)]
184 pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
185 let dist_key_hash = xxh64::xxh64(dist_key, 0);
186 (table_id as u64).bitxor(dist_key_hash)
188 }
189
190 #[inline(always)]
191 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
192 self.filter_reader.may_match(user_key_range, hash)
193 }
194
195 #[inline(always)]
196 pub fn block_count(&self) -> usize {
197 self.meta.block_metas.len()
198 }
199
200 #[inline(always)]
201 pub fn estimate_size(&self) -> usize {
202 8 + self.filter_reader.estimate_size() + self.meta.encoded_size()
203 }
204}
205
206#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
207pub struct BlockMeta {
208 pub smallest_key: Vec<u8>,
209 pub offset: u32,
210 pub len: u32,
211 pub uncompressed_size: u32,
212 pub total_key_count: u32,
213 pub stale_key_count: u32,
214}
215
216impl BlockMeta {
217 pub fn encode(&self, mut buf: impl BufMut) {
223 buf.put_u32_le(self.offset);
224 buf.put_u32_le(self.len);
225 buf.put_u32_le(self.uncompressed_size);
226 buf.put_u32_le(self.total_key_count);
227 buf.put_u32_le(self.stale_key_count);
228 put_length_prefixed_slice(buf, &self.smallest_key);
229 }
230
231 pub fn decode(buf: &mut &[u8]) -> Self {
232 let offset = buf.get_u32_le();
233 let len = buf.get_u32_le();
234 let uncompressed_size = buf.get_u32_le();
235
236 let total_key_count = buf.get_u32_le();
237 let stale_key_count = buf.get_u32_le();
238 let smallest_key = get_length_prefixed_slice(buf);
239 Self {
240 smallest_key,
241 offset,
242 len,
243 uncompressed_size,
244 total_key_count,
245 stale_key_count,
246 }
247 }
248
249 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
250 let offset = buf.get_u32_le();
251 let len = buf.get_u32_le();
252 let uncompressed_size = buf.get_u32_le();
253 let total_key_count = 0;
254 let stale_key_count = 0;
255 let smallest_key = get_length_prefixed_slice(buf);
256 Self {
257 smallest_key,
258 offset,
259 len,
260 uncompressed_size,
261 total_key_count,
262 stale_key_count,
263 }
264 }
265
266 #[inline]
267 pub fn encoded_size(&self) -> usize {
268 24 + self.smallest_key.len()
269 }
270
271 pub fn table_id(&self) -> TableId {
272 FullKey::decode(&self.smallest_key).user_key.table_id
273 }
274}
275
276#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
277pub struct SstableMeta {
278 pub block_metas: Vec<BlockMeta>,
279 pub bloom_filter: Vec<u8>,
280 pub estimated_size: u32,
281 pub key_count: u32,
282 pub smallest_key: Vec<u8>,
283 pub largest_key: Vec<u8>,
284 pub meta_offset: u64,
285 #[deprecated]
299 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
300 pub version: u32,
302}
303
304impl SstableMeta {
305 pub fn encode_to_bytes(&self) -> Vec<u8> {
320 let encoded_size = self.encoded_size();
321 let mut buf = Vec::with_capacity(encoded_size);
322 self.encode_to(&mut buf);
323 buf
324 }
325
326 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
327 let start = buf.as_ref().len();
328
329 buf.put_u32_le(
330 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
331 let tmp_full_key = FullKey::decode(&self.smallest_key);
332 panic!(
333 "WARN overflow can't convert block_metas_len {} into u32 table {}",
334 self.block_metas.len(),
335 tmp_full_key.user_key.table_id,
336 )
337 }),
338 );
339 for block_meta in &self.block_metas {
340 block_meta.encode(&mut buf);
341 }
342 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
343 buf.put_u32_le(self.estimated_size);
344 buf.put_u32_le(self.key_count);
345 put_length_prefixed_slice(&mut buf, &self.smallest_key);
346 put_length_prefixed_slice(&mut buf, &self.largest_key);
347 #[expect(deprecated)]
348 buf.put_u32_le(
349 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
350 let tmp_full_key = FullKey::decode(&self.smallest_key);
351 panic!(
352 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
353 self.monotonic_tombstone_events.len(),
354 tmp_full_key.user_key.table_id,
355 )
356 }),
357 );
358 #[expect(deprecated)]
359 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
360 monotonic_tombstone_event.encode(&mut buf);
361 }
362 buf.put_u64_le(self.meta_offset);
363
364 let end = buf.as_ref().len();
365
366 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
367 buf.put_u64_le(checksum);
368 buf.put_u32_le(VERSION);
369 buf.put_u32_le(MAGIC);
370 }
371
372 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
373 let mut cursor = buf.len();
374
375 cursor -= 4;
376 let magic = (&buf[cursor..]).get_u32_le();
377 if magic != MAGIC {
378 return Err(HummockError::magic_mismatch(MAGIC, magic));
379 }
380
381 cursor -= 4;
382 let version = (&buf[cursor..cursor + 4]).get_u32_le();
383 if version != VERSION && version != OLD_VERSION {
384 return Err(HummockError::invalid_format_version(version));
385 }
386
387 cursor -= 8;
388 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
389 let buf = &mut &buf[..cursor];
390 xxhash64_verify(buf, checksum)?;
391
392 let block_meta_count = buf.get_u32_le() as usize;
393 let mut block_metas = Vec::with_capacity(block_meta_count);
394 if version == OLD_VERSION {
395 for _ in 0..block_meta_count {
396 block_metas.push(BlockMeta::decode_from_v1(buf));
397 }
398 } else {
399 for _ in 0..block_meta_count {
400 block_metas.push(BlockMeta::decode(buf));
401 }
402 }
403
404 let bloom_filter = get_length_prefixed_slice(buf);
405 let estimated_size = buf.get_u32_le();
406 let key_count = buf.get_u32_le();
407 let smallest_key = get_length_prefixed_slice(buf);
408 let largest_key = get_length_prefixed_slice(buf);
409 let tomb_event_count = buf.get_u32_le() as usize;
410 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
411 for _ in 0..tomb_event_count {
412 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
413 monotonic_tombstone_events.push(monotonic_tombstone_event);
414 }
415 let meta_offset = buf.get_u64_le();
416
417 if !monotonic_tombstone_events.is_empty() {
418 warn!(
419 count = monotonic_tombstone_events.len(),
420 tables = ?monotonic_tombstone_events
421 .iter()
422 .map(|event| event.event_key.left_user_key.table_id)
423 .collect::<HashSet<_>>(),
424 "read non-empty range tombstones");
425 }
426
427 #[expect(deprecated)]
428 Ok(Self {
429 block_metas,
430 bloom_filter,
431 estimated_size,
432 key_count,
433 smallest_key,
434 largest_key,
435 meta_offset,
436 monotonic_tombstone_events,
437 version,
438 })
439 }
440
441 #[inline]
442 pub fn encoded_size(&self) -> usize {
443 4 + self
445 .block_metas
446 .iter()
447 .map(|block_meta| block_meta.encoded_size())
448 .sum::<usize>()
449 + 4 + 4 + self.bloom_filter.len()
452 + 4 + 4 + 4 + self.smallest_key.len()
456 + 4 + self.largest_key.len()
458 + 8 + 8 + 4 + 4 }
463}
464
465#[derive(Default)]
466pub struct SstableIteratorReadOptions {
467 pub cache_policy: CachePolicy,
468 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
469 pub max_preload_retry_times: usize,
470 pub prefetch_for_large_query: bool,
471}
472
473impl SstableIteratorReadOptions {
474 pub fn from_read_options(read_options: &ReadOptions) -> Self {
475 Self {
476 cache_policy: read_options.cache_policy,
477 must_iterated_end_user_key: None,
478 max_preload_retry_times: 0,
479 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
480 }
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::hummock::HummockValue;
488 use crate::hummock::iterator::test_utils::{
489 default_builder_opt_for_test, iterator_test_key_of,
490 };
491 use crate::hummock::test_utils::gen_test_sstable_data;
492
493 #[test]
494 fn test_sstable_meta_enc_dec() {
495 #[expect(deprecated)]
496 let meta = SstableMeta {
497 block_metas: vec![
498 BlockMeta {
499 smallest_key: b"0-smallest-key".to_vec(),
500 len: 100,
501 ..Default::default()
502 },
503 BlockMeta {
504 smallest_key: b"5-some-key".to_vec(),
505 offset: 100,
506 len: 100,
507 ..Default::default()
508 },
509 ],
510 bloom_filter: b"0123456789".to_vec(),
511 estimated_size: 123,
512 key_count: 123,
513 smallest_key: b"0-smallest-key".to_vec(),
514 largest_key: b"9-largest-key".to_vec(),
515 meta_offset: 123,
516 monotonic_tombstone_events: vec![],
517 version: VERSION,
518 };
519 let sz = meta.encoded_size();
520 let buf = meta.encode_to_bytes();
521 assert_eq!(sz, buf.len());
522 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
523 assert_eq!(decoded_meta, meta);
524
525 println!("buf: {}", buf.len());
526 }
527
528 #[tokio::test]
529 async fn test_sstable_serde() {
530 let (_, meta) = gen_test_sstable_data(
531 default_builder_opt_for_test(),
532 (0..100).clone().map(|x| {
533 (
534 iterator_test_key_of(x),
535 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
536 )
537 }),
538 )
539 .await;
540
541 let buffer = bincode::serialize(&meta).unwrap();
542
543 let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
544
545 assert_eq!(meta, m);
546
547 println!("{} vs {}", buffer.len(), meta.encoded_size());
548 }
549}