1use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::util::row_serde::OrderedRowSerde;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36 FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53 pub capacity: usize,
55 pub block_capacity: usize,
57 pub restart_interval: usize,
59 pub bloom_false_positive: f64,
61 pub compression_algorithm: CompressionAlgorithm,
63 pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67 fn from(options: &StorageOpts) -> SstableBuilderOptions {
68 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69 SstableBuilderOptions {
70 capacity,
71 block_capacity: (options.block_size_kb as usize) * (1 << 10),
72 restart_interval: DEFAULT_RESTART_INTERVAL,
73 bloom_false_positive: options.bloom_false_positive,
74 compression_algorithm: CompressionAlgorithm::None,
75 max_sst_size: options.compactor_max_sst_size,
76 }
77 }
78}
79
80impl Default for SstableBuilderOptions {
81 fn default() -> Self {
82 Self {
83 capacity: DEFAULT_SSTABLE_SIZE,
84 block_capacity: DEFAULT_BLOCK_SIZE,
85 restart_interval: DEFAULT_RESTART_INTERVAL,
86 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87 compression_algorithm: CompressionAlgorithm::None,
88 max_sst_size: DEFAULT_MAX_SST_SIZE,
89 }
90 }
91}
92
93pub struct SstableBuilderOutput<WO> {
94 pub sst_info: LocalSstableInfo,
95 pub writer_output: WO,
96 pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100 options: SstableBuilderOptions,
102 writer: W,
104 block_builder: BlockBuilder,
106
107 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108 block_metas: Vec<BlockMeta>,
110
111 table_ids: BTreeSet<u32>,
113 last_full_key: Vec<u8>,
114 raw_key: BytesMut,
116 raw_value: BytesMut,
117 last_table_id: Option<u32>,
118 sstable_id: u64,
119
120 table_stats: TableStatsMap,
122 last_table_stats: TableStats,
125
126 filter_builder: F,
127
128 epoch_set: BTreeSet<u64>,
129 memory_limiter: Option<Arc<MemoryLimiter>>,
130
131 block_size_vec: Vec<usize>, }
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135 pub fn for_test(
136 sstable_id: u64,
137 writer: W,
138 options: SstableBuilderOptions,
139 table_id_to_vnode: HashMap<StateTableId, usize>,
140 table_id_to_watermark_serde: HashMap<
141 u32,
142 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143 >,
144 ) -> Self {
145 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147 table_id_to_vnode,
148 table_id_to_watermark_serde,
149 ));
150
151 Self::new(
152 sstable_id,
153 writer,
154 Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
155 options,
156 compaction_catalog_agent_ref,
157 None,
158 )
159 }
160}
161
162impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
163 pub fn new(
164 sstable_id: u64,
165 writer: W,
166 filter_builder: F,
167 options: SstableBuilderOptions,
168 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
169 memory_limiter: Option<Arc<MemoryLimiter>>,
170 ) -> Self {
171 Self {
172 options: options.clone(),
173 writer,
174 block_builder: BlockBuilder::new(BlockBuilderOptions {
175 capacity: options.block_capacity,
176 restart_interval: options.restart_interval,
177 compression_algorithm: options.compression_algorithm,
178 }),
179 filter_builder,
180 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
181 table_ids: BTreeSet::new(),
182 last_table_id: None,
183 raw_key: BytesMut::new(),
184 raw_value: BytesMut::new(),
185 last_full_key: vec![],
186 sstable_id,
187 compaction_catalog_agent_ref,
188 table_stats: Default::default(),
189 last_table_stats: Default::default(),
190 epoch_set: BTreeSet::default(),
191 memory_limiter,
192 block_size_vec: Vec::new(),
193 }
194 }
195
196 pub async fn add_for_test(
198 &mut self,
199 full_key: FullKey<&[u8]>,
200 value: HummockValue<&[u8]>,
201 ) -> HummockResult<()> {
202 self.add(full_key, value).await
203 }
204
205 pub fn current_block_size(&self) -> usize {
206 self.block_builder.approximate_len()
207 }
208
209 pub async fn add_raw_block(
211 &mut self,
212 buf: Bytes,
213 filter_data: Vec<u8>,
214 smallest_key: FullKey<Vec<u8>>,
215 largest_key: Vec<u8>,
216 mut meta: BlockMeta,
217 ) -> HummockResult<bool> {
218 let table_id = smallest_key.user_key.table_id.table_id;
219 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
220 if !self.block_builder.is_empty() {
221 self.build_block().await?;
223 }
224
225 self.table_ids.insert(table_id);
226 self.finalize_last_table_stats();
227 self.last_table_id = Some(table_id);
228 }
229
230 if !self.block_builder.is_empty() {
231 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
232
233 if self.block_builder.approximate_len() < min_block_size {
235 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
236 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
237 iter.seek_to_first();
238 while iter.is_valid() {
239 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
240 panic!(
241 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
242 self.sstable_id, self.block_metas.len(), self.last_table_id
243 )
244 });
245 self.add_impl(iter.key(), value, false).await?;
246 iter.next();
247 }
248 return Ok(false);
249 }
250
251 self.build_block().await?;
252 }
253 self.last_full_key = largest_key;
254 assert_eq!(
255 meta.len as usize,
256 buf.len(),
257 "meta {} buf {} last_table_id {:?}",
258 meta.len,
259 buf.len(),
260 self.last_table_id
261 );
262 meta.offset = self.writer.data_len() as u32;
263 self.block_metas.push(meta);
264 self.filter_builder.add_raw_data(filter_data);
265 let block_meta = self.block_metas.last_mut().unwrap();
266 self.writer.write_block_bytes(buf, block_meta).await?;
267
268 Ok(true)
269 }
270
271 pub async fn add(
273 &mut self,
274 full_key: FullKey<&[u8]>,
275 value: HummockValue<&[u8]>,
276 ) -> HummockResult<()> {
277 self.add_impl(full_key, value, true).await
278 }
279
280 async fn add_impl(
282 &mut self,
283 full_key: FullKey<&[u8]>,
284 value: HummockValue<&[u8]>,
285 could_switch_block: bool,
286 ) -> HummockResult<()> {
287 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
288
289 let table_key_len = full_key.user_key.table_key.as_ref().len();
290 let table_value_len = match &value {
291 HummockValue::Put(t) => t.len(),
292 HummockValue::Delete => 0,
293 };
294 let large_value_len = self.options.max_sst_size as usize / 10;
295 let large_key_value_len = self.options.max_sst_size as usize / 2;
296 if table_key_len >= LARGE_KEY_LEN
297 || table_value_len > large_value_len
298 || table_key_len + table_value_len > large_key_value_len
299 {
300 let table_id = full_key.user_key.table_id.table_id();
301 tracing::warn!(
302 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
303 table_id,
304 table_key_len,
305 table_value_len,
306 full_key.epoch_with_gap.pure_epoch(),
307 full_key.epoch_with_gap.offset(),
308 );
309 }
310
311 full_key.encode_into(&mut self.raw_key);
313 value.encode(&mut self.raw_value);
314 let is_new_user_key = self.last_full_key.is_empty()
315 || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
316 let table_id = full_key.user_key.table_id.table_id();
317 let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
318 let current_block_size = self.current_block_size();
319 let is_block_full = current_block_size >= self.options.block_capacity
320 || (current_block_size > self.options.block_capacity / 4 * 3
321 && current_block_size + self.raw_value.len() + self.raw_key.len()
322 > self.options.block_capacity);
323
324 if is_new_table {
325 assert!(
326 could_switch_block,
327 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
328 is_new_user_key,
329 self.sstable_id,
330 self.block_metas.len(),
331 table_id,
332 self.last_table_id,
333 full_key
334 );
335 self.table_ids.insert(table_id);
336 self.finalize_last_table_stats();
337 self.last_table_id = Some(table_id);
338 if !self.block_builder.is_empty() {
339 self.build_block().await?;
340 }
341 } else if is_block_full && could_switch_block {
342 self.build_block().await?;
343 }
344 self.last_table_stats.total_key_count += 1;
345 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
346
347 if self.block_builder.is_empty() {
349 self.block_metas.push(BlockMeta {
350 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
351 panic!(
352 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
353 self.writer.data_len(),
354 self.sstable_id,
355 self.block_metas.len(),
356 self.table_ids,
357 )
358 }),
359 len: 0,
360 smallest_key: full_key.encode(),
361 uncompressed_size: 0,
362 total_key_count: 0,
363 stale_key_count: 0,
364 });
365 }
366
367 let table_id = full_key.user_key.table_id.table_id();
368 let mut extract_key = user_key(&self.raw_key);
369 extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
370 if !extract_key.is_empty() {
372 self.filter_builder.add_key(extract_key, table_id);
373 }
374 self.block_builder.add(full_key, self.raw_value.as_ref());
375 self.block_metas.last_mut().unwrap().total_key_count += 1;
376 if !is_new_user_key || value.is_delete() {
377 self.block_metas.last_mut().unwrap().stale_key_count += 1;
378 }
379 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
380 self.last_table_stats.total_value_size += value.encoded_len() as i64;
381
382 self.last_full_key.clear();
383 self.last_full_key.extend_from_slice(&self.raw_key);
384
385 self.raw_key.clear();
386 self.raw_value.clear();
387 Ok(())
388 }
389
390 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
400 let smallest_key = if self.block_metas.is_empty() {
401 vec![]
402 } else {
403 self.block_metas[0].smallest_key.clone()
404 };
405 let largest_key = self.last_full_key.clone();
406 self.finalize_last_table_stats();
407
408 self.build_block().await?;
409 let right_exclusive = false;
410 let meta_offset = self.writer.data_len() as u64;
411
412 let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
413 BloomFilterType::Blocked
414 } else {
415 BloomFilterType::Sstable
416 };
417 let bloom_filter = if self.options.bloom_false_positive > 0.0 {
418 self.filter_builder.finish(self.memory_limiter.clone())
419 } else {
420 vec![]
421 };
422
423 let total_key_count = self
424 .block_metas
425 .iter()
426 .map(|block_meta| block_meta.total_key_count as u64)
427 .sum::<u64>();
428 let stale_key_count = self
429 .block_metas
430 .iter()
431 .map(|block_meta| block_meta.stale_key_count as u64)
432 .sum::<u64>();
433 let uncompressed_file_size = self
434 .block_metas
435 .iter()
436 .map(|block_meta| block_meta.uncompressed_size as u64)
437 .sum::<u64>();
438
439 #[expect(deprecated)]
440 let mut meta = SstableMeta {
441 block_metas: self.block_metas,
442 bloom_filter,
443 estimated_size: 0,
444 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
445 panic!(
446 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
447 total_key_count, self.table_ids,
448 )
449 }),
450 smallest_key,
451 largest_key,
452 version: VERSION,
453 meta_offset,
454 monotonic_tombstone_events: vec![],
455 };
456
457 let meta_encode_size = meta.encoded_size();
458 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
459 panic!(
460 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
461 meta_encode_size, self.table_ids,
462 )
463 });
464 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
465 panic!(
466 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
467 meta_offset, self.table_ids,
468 )
469 });
470 meta.estimated_size = encoded_size_u32
471 .checked_add(meta_offset_u32)
472 .unwrap_or_else(|| {
473 panic!(
474 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
475 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
476 )
477 });
478
479 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
480 (0, 0)
481 } else {
482 let total_key_count: usize = self
483 .table_stats
484 .values()
485 .map(|s| s.total_key_count as usize)
486 .sum();
487
488 if total_key_count == 0 {
489 (0, 0)
490 } else {
491 let total_key_size: usize = self
492 .table_stats
493 .values()
494 .map(|s| s.total_key_size as usize)
495 .sum();
496
497 let total_value_size: usize = self
498 .table_stats
499 .values()
500 .map(|s| s.total_value_size as usize)
501 .sum();
502
503 (
504 total_key_size / total_key_count,
505 total_value_size / total_key_count,
506 )
507 }
508 };
509
510 let (min_epoch, max_epoch) = {
511 if self.epoch_set.is_empty() {
512 (HummockEpoch::MAX, u64::MIN)
513 } else {
514 (
515 *self.epoch_set.first().unwrap(),
516 *self.epoch_set.last().unwrap(),
517 )
518 }
519 };
520
521 let sst_info: SstableInfo = SstableInfoInner {
522 object_id: self.sstable_id,
523 sst_id: self.sstable_id,
524 bloom_filter_kind,
525 key_range: KeyRange {
526 left: Bytes::from(meta.smallest_key.clone()),
527 right: Bytes::from(meta.largest_key.clone()),
528 right_exclusive,
529 },
530 file_size: meta.estimated_size as u64,
531 table_ids: self.table_ids.into_iter().collect(),
532 meta_offset: meta.meta_offset,
533 stale_key_count,
534 total_key_count,
535 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
536 min_epoch,
537 max_epoch,
538 range_tombstone_count: 0,
539 sst_size: meta.estimated_size as u64,
540 }
541 .into();
542 tracing::trace!(
543 "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
544 meta.encoded_size(),
545 meta.bloom_filter.len(),
546 total_key_count,
547 stale_key_count,
548 min_epoch,
549 max_epoch,
550 self.epoch_set.len()
551 );
552 let bloom_filter_size = meta.bloom_filter.len();
553 let sstable_file_size = sst_info.file_size as usize;
554
555 if !meta.block_metas.is_empty() {
556 let mut last_table_id = meta.block_metas[0].table_id().table_id();
558 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
559 for block_meta in &meta.block_metas {
560 let block_table_id = block_meta.table_id();
561 if last_table_id != block_table_id.table_id() {
562 last_table_id = block_table_id.table_id();
563 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
564 }
565
566 last_table_stats.total_compressed_size += block_meta.len as u64;
567 }
568 }
569
570 let writer_output = self.writer.finish(meta).await?;
571 let now = SystemTime::now()
580 .duration_since(SystemTime::UNIX_EPOCH)
581 .expect("Clock may have gone backwards")
582 .as_secs();
583 Ok(SstableBuilderOutput::<W::Output> {
584 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
585 writer_output,
586 stats: SstableBuilderOutputStats {
587 bloom_filter_size,
588 avg_key_size,
589 avg_value_size,
590 epoch_count: self.epoch_set.len(),
591 block_size_vec: self.block_size_vec,
592 sstable_file_size,
593 },
594 })
595 }
596
597 pub fn approximate_len(&self) -> usize {
598 self.writer.data_len()
599 + self.block_builder.approximate_len()
600 + self.filter_builder.approximate_len()
601 }
602
603 pub async fn build_block(&mut self) -> HummockResult<()> {
604 if self.block_builder.is_empty() {
606 return Ok(());
607 }
608
609 let block_meta = self.block_metas.last_mut().unwrap();
610 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
611 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
612 .unwrap_or_else(|_| {
613 panic!(
614 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
615 uncompressed_block_size,
616 self.block_builder.table_id(),
617 )
618 });
619 let block = self.block_builder.build();
620 self.writer.write_block(block, block_meta).await?;
621 self.block_size_vec.push(block.len());
622 self.filter_builder
623 .switch_block(self.memory_limiter.clone());
624 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
625 panic!(
626 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
627 self.writer.data_len(),
628 self.block_builder.table_id(),
629 )
630 });
631 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
632 panic!(
633 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
634 data_len, block_meta.offset
635 )
636 });
637
638 if data_len as usize > self.options.capacity * 2 {
639 tracing::warn!(
640 "WARN unexpected block size {} table {:?}",
641 data_len,
642 self.block_builder.table_id()
643 );
644 }
645
646 self.block_builder.clear();
647 Ok(())
648 }
649
650 pub fn is_empty(&self) -> bool {
651 self.writer.data_len() > 0
652 }
653
654 pub fn reach_capacity(&self) -> bool {
656 self.approximate_len() >= self.options.capacity
657 }
658
659 fn finalize_last_table_stats(&mut self) {
660 if self.table_ids.is_empty() || self.last_table_id.is_none() {
661 return;
662 }
663 self.table_stats.insert(
664 self.last_table_id.unwrap(),
665 std::mem::take(&mut self.last_table_stats),
666 );
667 }
668}
669
670pub struct SstableBuilderOutputStats {
671 bloom_filter_size: usize,
672 avg_key_size: usize,
673 avg_value_size: usize,
674 epoch_count: usize,
675 block_size_vec: Vec<usize>, sstable_file_size: usize,
677}
678
679impl SstableBuilderOutputStats {
680 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
681 if self.bloom_filter_size != 0 {
682 metrics
683 .sstable_bloom_filter_size
684 .observe(self.bloom_filter_size as _);
685 }
686
687 if self.sstable_file_size != 0 {
688 metrics
689 .sstable_file_size
690 .observe(self.sstable_file_size as _);
691 }
692
693 if self.avg_key_size != 0 {
694 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
695 }
696
697 if self.avg_value_size != 0 {
698 metrics
699 .sstable_avg_value_size
700 .observe(self.avg_value_size as _);
701 }
702
703 if self.epoch_count != 0 {
704 metrics
705 .sstable_distinct_epoch_count
706 .observe(self.epoch_count as _);
707 }
708
709 if !self.block_size_vec.is_empty() {
710 for block_size in &self.block_size_vec {
711 metrics.sstable_block_size.observe(*block_size as _);
712 }
713 }
714 }
715}
716
717#[cfg(test)]
718pub(super) mod tests {
719 use std::collections::{Bound, HashMap};
720
721 use risingwave_common::catalog::TableId;
722 use risingwave_common::hash::VirtualNode;
723 use risingwave_common::util::epoch::test_epoch;
724 use risingwave_hummock_sdk::key::UserKey;
725
726 use super::*;
727 use crate::assert_bytes_eq;
728 use crate::compaction_catalog_manager::{
729 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
730 };
731 use crate::hummock::iterator::test_utils::mock_sstable_store;
732 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
733 use crate::hummock::test_utils::{
734 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
735 test_key_of, test_value_of,
736 };
737 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
738 use crate::monitor::StoreLocalStatistic;
739
740 #[tokio::test]
741 async fn test_empty() {
742 let opt = SstableBuilderOptions {
743 capacity: 0,
744 block_capacity: 4096,
745 restart_interval: 16,
746 bloom_false_positive: 0.001,
747 ..Default::default()
748 };
749
750 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
751 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
752 let b = SstableBuilder::for_test(
753 0,
754 mock_sst_writer(&opt),
755 opt,
756 table_id_to_vnode,
757 table_id_to_watermark_serde,
758 );
759
760 b.finish().await.unwrap();
761 }
762
763 #[tokio::test]
764 async fn test_basic() {
765 let opt = default_builder_opt_for_test();
766
767 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
768 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
769 let mut b = SstableBuilder::for_test(
770 0,
771 mock_sst_writer(&opt),
772 opt,
773 table_id_to_vnode,
774 table_id_to_watermark_serde,
775 );
776
777 for i in 0..TEST_KEYS_COUNT {
778 b.add_for_test(
779 test_key_of(i).to_ref(),
780 HummockValue::put(&test_value_of(i)),
781 )
782 .await
783 .unwrap();
784 }
785
786 let output = b.finish().await.unwrap();
787 let info = output.sst_info.sst_info;
788
789 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
790 assert_bytes_eq!(
791 test_key_of(TEST_KEYS_COUNT - 1).encode(),
792 info.key_range.right
793 );
794 let (data, meta) = output.writer_output;
795 assert_eq!(info.file_size, meta.estimated_size as u64);
796 let offset = info.meta_offset as usize;
797 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
798 assert_eq!(meta2, meta);
799 }
800
801 async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
802 let key_count = 1000;
803
804 let opts = SstableBuilderOptions {
805 capacity: 0,
806 block_capacity: 4096,
807 restart_interval: 16,
808 bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
809 ..Default::default()
810 };
811
812 let sstable_store = mock_sstable_store().await;
814 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
815 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
816 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
817 opts,
818 0,
819 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
820 sstable_store.clone(),
821 CachePolicy::NotFill,
822 table_id_to_vnode,
823 table_id_to_watermark_serde,
824 )
825 .await;
826 let table = sstable_store
827 .sstable(&sst_info, &mut StoreLocalStatistic::default())
828 .await
829 .unwrap();
830
831 assert_eq!(table.has_bloom_filter(), with_blooms);
832 for i in 0..key_count {
833 let full_key = test_key_of(i);
834 if table.has_bloom_filter() {
835 let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
836 let key_ref = full_key.user_key.as_ref();
837 assert!(
838 table.may_match_hash(
839 &(Bound::Included(key_ref), Bound::Included(key_ref)),
840 hash
841 ),
842 "failed at {}",
843 i
844 );
845 }
846 }
847 }
848
849 #[tokio::test]
850 async fn test_bloom_filter() {
851 test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
852 test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
853 test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
854 test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
855 }
856
857 #[tokio::test]
858 async fn test_no_bloom_filter_block() {
859 let opts = SstableBuilderOptions::default();
860 let sstable_store = mock_sstable_store().await;
862 let writer_opts = SstableWriterOptions::default();
863 let object_id = 1;
864 let writer = sstable_store
865 .clone()
866 .create_sst_writer(object_id, writer_opts);
867 let mut filter = MultiFilterKeyExtractor::default();
868 filter.register(1, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
869 filter.register(
870 2,
871 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
872 );
873 filter.register(3, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
874
875 let table_id_to_vnode = HashMap::from_iter(vec![
876 (1, VirtualNode::COUNT_FOR_TEST),
877 (2, VirtualNode::COUNT_FOR_TEST),
878 (3, VirtualNode::COUNT_FOR_TEST),
879 ]);
880 let table_id_to_watermark_serde = HashMap::from_iter(vec![(1, None), (2, None), (3, None)]);
881
882 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
883 FilterKeyExtractorImpl::Multi(filter),
884 table_id_to_vnode,
885 table_id_to_watermark_serde,
886 ));
887
888 let mut builder = SstableBuilder::new(
889 object_id,
890 writer,
891 BlockedXor16FilterBuilder::new(1024),
892 opts,
893 compaction_catalog_agent_ref,
894 None,
895 );
896
897 let key_count: usize = 10000;
898 for table_id in 1..4 {
899 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
900 for idx in 0..key_count {
901 table_key.resize(VirtualNode::SIZE, 0);
902 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
903 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
904 let v = test_value_of(idx);
905 builder
906 .add(
907 FullKey::from_user_key(k, test_epoch(1)),
908 HummockValue::put(v.as_ref()),
909 )
910 .await
911 .unwrap();
912 }
913 }
914 let ret = builder.finish().await.unwrap();
915 let sst_info = ret.sst_info.sst_info.clone();
916 ret.writer_output.await.unwrap().unwrap();
917 let table = sstable_store
918 .sstable(&sst_info, &mut StoreLocalStatistic::default())
919 .await
920 .unwrap();
921 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
922 for idx in 0..key_count {
923 table_key.resize(VirtualNode::SIZE, 0);
924 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
925 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
926 let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
927 let key_ref = k.as_ref();
928 assert!(
929 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
930 );
931 }
932 }
933}