1mod block;
19
20use std::collections::HashSet;
21use std::fmt::{Debug, Formatter};
22use std::ops::{BitXor, Bound, Range};
23
24pub use block::*;
25mod block_iterator;
26pub use block_iterator::*;
27mod bloom;
28mod xor_filter;
29pub use bloom::BloomFilterBuilder;
30use serde::{Deserialize, Serialize};
31pub use xor_filter::{
32 BlockedXor16FilterBuilder, Xor8FilterBuilder, Xor16FilterBuilder, XorFilterReader,
33};
34pub mod builder;
35pub use builder::*;
36pub mod writer;
37use risingwave_common::catalog::TableId;
38pub use writer::*;
39mod forward_sstable_iterator;
40pub mod multi_builder;
41use bytes::{Buf, BufMut};
42pub use forward_sstable_iterator::*;
43use tracing::warn;
44mod backward_sstable_iterator;
45pub use backward_sstable_iterator::*;
46use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef};
47use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
48
49mod filter;
50mod utils;
51
52pub use filter::FilterBuilder;
53pub use utils::CompressionAlgorithm;
54use utils::{get_length_prefixed_slice, put_length_prefixed_slice};
55use xxhash_rust::{xxh32, xxh64};
56
57use self::utils::{xxhash64_checksum, xxhash64_verify};
58use super::{HummockError, HummockResult};
59use crate::hummock::CachePolicy;
60use crate::store::ReadOptions;
61
62const MAGIC: u32 = 0x5785ab73;
63const OLD_VERSION: u32 = 1;
64const VERSION: u32 = 2;
65
66#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
80pub struct MonotonicDeleteEvent {
81 pub event_key:
82 risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange,
83 pub new_epoch: HummockEpoch,
84}
85
86impl MonotonicDeleteEvent {
87 pub fn encode(&self, mut buf: impl BufMut) {
88 self.event_key
89 .left_user_key
90 .encode_length_prefixed(&mut buf);
91 buf.put_u8(if self.event_key.is_exclude_left_key {
92 1
93 } else {
94 0
95 });
96 buf.put_u64_le(self.new_epoch);
97 }
98
99 pub fn decode(buf: &mut &[u8]) -> Self {
100 use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*;
101 let user_key = UserKey::decode_length_prefixed(buf);
102 let exclude_left_key_flag = buf.get_u8();
103 let is_exclude_left_key = match exclude_left_key_flag {
104 0 => false,
105 1 => true,
106 _ => panic!("exclusive flag should be either 0 or 1"),
107 };
108 let new_epoch = buf.get_u64_le();
109 Self {
110 event_key: PointRange {
111 left_user_key: user_key,
112 is_exclude_left_key,
113 },
114 new_epoch,
115 }
116 }
117}
118
119#[derive(Serialize, Deserialize)]
120struct SerdeSstable {
121 id: HummockSstableObjectId,
122 meta: SstableMeta,
123}
124
125impl From<SerdeSstable> for Sstable {
126 fn from(SerdeSstable { id, meta }: SerdeSstable) -> Self {
127 Sstable::new(id, meta)
128 }
129}
130
131#[derive(Clone, Serialize, Deserialize)]
133#[serde(from = "SerdeSstable")]
134pub struct Sstable {
135 pub id: HummockSstableObjectId,
136 pub meta: SstableMeta,
137 #[serde(skip)]
138 pub filter_reader: XorFilterReader,
139}
140
141impl Debug for Sstable {
142 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("Sstable")
144 .field("id", &self.id)
145 .field("meta", &self.meta)
146 .finish()
147 }
148}
149
150impl Sstable {
151 pub fn new(id: HummockSstableObjectId, mut meta: SstableMeta) -> Self {
152 let filter_data = std::mem::take(&mut meta.bloom_filter);
153 let filter_reader = XorFilterReader::new(&filter_data, &meta.block_metas);
154 Self {
155 id,
156 meta,
157 filter_reader,
158 }
159 }
160
161 #[inline(always)]
162 pub fn has_bloom_filter(&self) -> bool {
163 !self.filter_reader.is_empty()
164 }
165
166 pub fn calculate_block_info(&self, block_index: usize) -> (Range<usize>, usize) {
167 let block_meta = &self.meta.block_metas[block_index];
168 let range =
169 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
170 let uncompressed_capacity = block_meta.uncompressed_size as usize;
171 (range, uncompressed_capacity)
172 }
173
174 #[inline(always)]
175 pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 {
176 let dist_key_hash = xxh32::xxh32(dist_key, 0);
177 table_id.bitxor(dist_key_hash)
179 }
180
181 #[inline(always)]
182 pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 {
183 let dist_key_hash = xxh64::xxh64(dist_key, 0);
184 (table_id as u64).bitxor(dist_key_hash)
186 }
187
188 #[inline(always)]
189 pub fn may_match_hash(&self, user_key_range: &UserKeyRangeRef<'_>, hash: u64) -> bool {
190 self.filter_reader.may_match(user_key_range, hash)
191 }
192
193 #[inline(always)]
194 pub fn block_count(&self) -> usize {
195 self.meta.block_metas.len()
196 }
197
198 #[inline(always)]
199 pub fn estimate_size(&self) -> usize {
200 8 + self.filter_reader.estimate_size() + self.meta.encoded_size()
201 }
202}
203
204#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
205pub struct BlockMeta {
206 pub smallest_key: Vec<u8>,
207 pub offset: u32,
208 pub len: u32,
209 pub uncompressed_size: u32,
210 pub total_key_count: u32,
211 pub stale_key_count: u32,
212}
213
214impl BlockMeta {
215 pub fn encode(&self, mut buf: impl BufMut) {
221 buf.put_u32_le(self.offset);
222 buf.put_u32_le(self.len);
223 buf.put_u32_le(self.uncompressed_size);
224 buf.put_u32_le(self.total_key_count);
225 buf.put_u32_le(self.stale_key_count);
226 put_length_prefixed_slice(buf, &self.smallest_key);
227 }
228
229 pub fn decode(buf: &mut &[u8]) -> Self {
230 let offset = buf.get_u32_le();
231 let len = buf.get_u32_le();
232 let uncompressed_size = buf.get_u32_le();
233
234 let total_key_count = buf.get_u32_le();
235 let stale_key_count = buf.get_u32_le();
236 let smallest_key = get_length_prefixed_slice(buf);
237 Self {
238 smallest_key,
239 offset,
240 len,
241 uncompressed_size,
242 total_key_count,
243 stale_key_count,
244 }
245 }
246
247 pub fn decode_from_v1(buf: &mut &[u8]) -> Self {
248 let offset = buf.get_u32_le();
249 let len = buf.get_u32_le();
250 let uncompressed_size = buf.get_u32_le();
251 let total_key_count = 0;
252 let stale_key_count = 0;
253 let smallest_key = get_length_prefixed_slice(buf);
254 Self {
255 smallest_key,
256 offset,
257 len,
258 uncompressed_size,
259 total_key_count,
260 stale_key_count,
261 }
262 }
263
264 #[inline]
265 pub fn encoded_size(&self) -> usize {
266 24 + self.smallest_key.len()
267 }
268
269 pub fn table_id(&self) -> TableId {
270 FullKey::decode(&self.smallest_key).user_key.table_id
271 }
272}
273
274#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
275pub struct SstableMeta {
276 pub block_metas: Vec<BlockMeta>,
277 pub bloom_filter: Vec<u8>,
278 pub estimated_size: u32,
279 pub key_count: u32,
280 pub smallest_key: Vec<u8>,
281 pub largest_key: Vec<u8>,
282 pub meta_offset: u64,
283 #[deprecated]
297 pub monotonic_tombstone_events: Vec<MonotonicDeleteEvent>,
298 pub version: u32,
300}
301
302impl SstableMeta {
303 pub fn encode_to_bytes(&self) -> Vec<u8> {
318 let encoded_size = self.encoded_size();
319 let mut buf = Vec::with_capacity(encoded_size);
320 self.encode_to(&mut buf);
321 buf
322 }
323
324 pub fn encode_to(&self, mut buf: impl BufMut + AsRef<[u8]>) {
325 let start = buf.as_ref().len();
326
327 buf.put_u32_le(
328 utils::checked_into_u32(self.block_metas.len()).unwrap_or_else(|_| {
329 let tmp_full_key = FullKey::decode(&self.smallest_key);
330 panic!(
331 "WARN overflow can't convert block_metas_len {} into u32 table {}",
332 self.block_metas.len(),
333 tmp_full_key.user_key.table_id,
334 )
335 }),
336 );
337 for block_meta in &self.block_metas {
338 block_meta.encode(&mut buf);
339 }
340 put_length_prefixed_slice(&mut buf, &self.bloom_filter);
341 buf.put_u32_le(self.estimated_size);
342 buf.put_u32_le(self.key_count);
343 put_length_prefixed_slice(&mut buf, &self.smallest_key);
344 put_length_prefixed_slice(&mut buf, &self.largest_key);
345 #[expect(deprecated)]
346 buf.put_u32_le(
347 utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| {
348 let tmp_full_key = FullKey::decode(&self.smallest_key);
349 panic!(
350 "WARN overflow can't convert monotonic_tombstone_events_len {} into u32 table {}",
351 self.monotonic_tombstone_events.len(),
352 tmp_full_key.user_key.table_id,
353 )
354 }),
355 );
356 #[expect(deprecated)]
357 for monotonic_tombstone_event in &self.monotonic_tombstone_events {
358 monotonic_tombstone_event.encode(&mut buf);
359 }
360 buf.put_u64_le(self.meta_offset);
361
362 let end = buf.as_ref().len();
363
364 let checksum = xxhash64_checksum(&buf.as_ref()[start..end]);
365 buf.put_u64_le(checksum);
366 buf.put_u32_le(VERSION);
367 buf.put_u32_le(MAGIC);
368 }
369
370 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
371 let mut cursor = buf.len();
372
373 cursor -= 4;
374 let magic = (&buf[cursor..]).get_u32_le();
375 if magic != MAGIC {
376 return Err(HummockError::magic_mismatch(MAGIC, magic));
377 }
378
379 cursor -= 4;
380 let version = (&buf[cursor..cursor + 4]).get_u32_le();
381 if version != VERSION && version != OLD_VERSION {
382 return Err(HummockError::invalid_format_version(version));
383 }
384
385 cursor -= 8;
386 let checksum = (&buf[cursor..cursor + 8]).get_u64_le();
387 let buf = &mut &buf[..cursor];
388 xxhash64_verify(buf, checksum)?;
389
390 let block_meta_count = buf.get_u32_le() as usize;
391 let mut block_metas = Vec::with_capacity(block_meta_count);
392 if version == OLD_VERSION {
393 for _ in 0..block_meta_count {
394 block_metas.push(BlockMeta::decode_from_v1(buf));
395 }
396 } else {
397 for _ in 0..block_meta_count {
398 block_metas.push(BlockMeta::decode(buf));
399 }
400 }
401
402 let bloom_filter = get_length_prefixed_slice(buf);
403 let estimated_size = buf.get_u32_le();
404 let key_count = buf.get_u32_le();
405 let smallest_key = get_length_prefixed_slice(buf);
406 let largest_key = get_length_prefixed_slice(buf);
407 let tomb_event_count = buf.get_u32_le() as usize;
408 let mut monotonic_tombstone_events = Vec::with_capacity(tomb_event_count);
409 for _ in 0..tomb_event_count {
410 let monotonic_tombstone_event = MonotonicDeleteEvent::decode(buf);
411 monotonic_tombstone_events.push(monotonic_tombstone_event);
412 }
413 let meta_offset = buf.get_u64_le();
414
415 if !monotonic_tombstone_events.is_empty() {
416 warn!(
417 count = monotonic_tombstone_events.len(),
418 tables = ?monotonic_tombstone_events
419 .iter()
420 .map(|event| event.event_key.left_user_key.table_id)
421 .collect::<HashSet<_>>(),
422 "read non-empty range tombstones");
423 }
424
425 #[expect(deprecated)]
426 Ok(Self {
427 block_metas,
428 bloom_filter,
429 estimated_size,
430 key_count,
431 smallest_key,
432 largest_key,
433 meta_offset,
434 monotonic_tombstone_events,
435 version,
436 })
437 }
438
439 #[inline]
440 pub fn encoded_size(&self) -> usize {
441 4 + self
443 .block_metas
444 .iter()
445 .map(|block_meta| block_meta.encoded_size())
446 .sum::<usize>()
447 + 4 + 4 + self.bloom_filter.len()
450 + 4 + 4 + 4 + self.smallest_key.len()
454 + 4 + self.largest_key.len()
456 + 8 + 8 + 4 + 4 }
461}
462
463#[derive(Default)]
464pub struct SstableIteratorReadOptions {
465 pub cache_policy: CachePolicy,
466 pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
467 pub max_preload_retry_times: usize,
468 pub prefetch_for_large_query: bool,
469}
470
471impl SstableIteratorReadOptions {
472 pub fn from_read_options(read_options: &ReadOptions) -> Self {
473 Self {
474 cache_policy: read_options.cache_policy,
475 must_iterated_end_user_key: None,
476 max_preload_retry_times: 0,
477 prefetch_for_large_query: read_options.prefetch_options.for_large_query,
478 }
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use crate::hummock::HummockValue;
486 use crate::hummock::iterator::test_utils::{
487 default_builder_opt_for_test, iterator_test_key_of,
488 };
489 use crate::hummock::test_utils::gen_test_sstable_data;
490
491 #[test]
492 fn test_sstable_meta_enc_dec() {
493 #[expect(deprecated)]
494 let meta = SstableMeta {
495 block_metas: vec![
496 BlockMeta {
497 smallest_key: b"0-smallest-key".to_vec(),
498 len: 100,
499 ..Default::default()
500 },
501 BlockMeta {
502 smallest_key: b"5-some-key".to_vec(),
503 offset: 100,
504 len: 100,
505 ..Default::default()
506 },
507 ],
508 bloom_filter: b"0123456789".to_vec(),
509 estimated_size: 123,
510 key_count: 123,
511 smallest_key: b"0-smallest-key".to_vec(),
512 largest_key: b"9-largest-key".to_vec(),
513 meta_offset: 123,
514 monotonic_tombstone_events: vec![],
515 version: VERSION,
516 };
517 let sz = meta.encoded_size();
518 let buf = meta.encode_to_bytes();
519 assert_eq!(sz, buf.len());
520 let decoded_meta = SstableMeta::decode(&buf[..]).unwrap();
521 assert_eq!(decoded_meta, meta);
522
523 println!("buf: {}", buf.len());
524 }
525
526 #[tokio::test]
527 async fn test_sstable_serde() {
528 let (_, meta) = gen_test_sstable_data(
529 default_builder_opt_for_test(),
530 (0..100).clone().map(|x| {
531 (
532 iterator_test_key_of(x),
533 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()),
534 )
535 }),
536 )
537 .await;
538
539 let buffer = bincode::serialize(&meta).unwrap();
540
541 let m: SstableMeta = bincode::deserialize(&buffer).unwrap();
542
543 assert_eq!(meta, m);
544
545 println!("{} vs {}", buffer.len(), meta.encoded_size());
546 }
547}