1use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::util::row_serde::OrderedRowSerde;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36 FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53 pub capacity: usize,
55 pub block_capacity: usize,
57 pub restart_interval: usize,
59 pub bloom_false_positive: f64,
61 pub compression_algorithm: CompressionAlgorithm,
63 pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67 fn from(options: &StorageOpts) -> SstableBuilderOptions {
68 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69 SstableBuilderOptions {
70 capacity,
71 block_capacity: (options.block_size_kb as usize) * (1 << 10),
72 restart_interval: DEFAULT_RESTART_INTERVAL,
73 bloom_false_positive: options.bloom_false_positive,
74 compression_algorithm: CompressionAlgorithm::None,
75 max_sst_size: options.compactor_max_sst_size,
76 }
77 }
78}
79
80impl Default for SstableBuilderOptions {
81 fn default() -> Self {
82 Self {
83 capacity: DEFAULT_SSTABLE_SIZE,
84 block_capacity: DEFAULT_BLOCK_SIZE,
85 restart_interval: DEFAULT_RESTART_INTERVAL,
86 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87 compression_algorithm: CompressionAlgorithm::None,
88 max_sst_size: DEFAULT_MAX_SST_SIZE,
89 }
90 }
91}
92
93pub struct SstableBuilderOutput<WO> {
94 pub sst_info: LocalSstableInfo,
95 pub writer_output: WO,
96 pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100 options: SstableBuilderOptions,
102 writer: W,
104 block_builder: BlockBuilder,
106
107 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108 block_metas: Vec<BlockMeta>,
110
111 table_ids: BTreeSet<u32>,
113 last_full_key: Vec<u8>,
114 raw_key: BytesMut,
116 raw_value: BytesMut,
117 last_table_id: Option<u32>,
118 sst_object_id: HummockSstableObjectId,
119
120 table_stats: TableStatsMap,
122 last_table_stats: TableStats,
125
126 filter_builder: F,
127
128 epoch_set: BTreeSet<u64>,
129 memory_limiter: Option<Arc<MemoryLimiter>>,
130
131 block_size_vec: Vec<usize>, }
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135 pub fn for_test(
136 sstable_id: u64,
137 writer: W,
138 options: SstableBuilderOptions,
139 table_id_to_vnode: HashMap<StateTableId, usize>,
140 table_id_to_watermark_serde: HashMap<
141 u32,
142 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143 >,
144 ) -> Self {
145 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147 table_id_to_vnode,
148 table_id_to_watermark_serde,
149 ));
150
151 Self::new(
152 sstable_id,
153 writer,
154 Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
155 options,
156 compaction_catalog_agent_ref,
157 None,
158 )
159 }
160}
161
162impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
163 pub fn new(
164 sst_object_id: impl Into<HummockSstableObjectId>,
165 writer: W,
166 filter_builder: F,
167 options: SstableBuilderOptions,
168 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
169 memory_limiter: Option<Arc<MemoryLimiter>>,
170 ) -> Self {
171 let sst_object_id = sst_object_id.into();
172 Self {
173 options: options.clone(),
174 writer,
175 block_builder: BlockBuilder::new(BlockBuilderOptions {
176 capacity: options.block_capacity,
177 restart_interval: options.restart_interval,
178 compression_algorithm: options.compression_algorithm,
179 }),
180 filter_builder,
181 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
182 table_ids: BTreeSet::new(),
183 last_table_id: None,
184 raw_key: BytesMut::new(),
185 raw_value: BytesMut::new(),
186 last_full_key: vec![],
187 sst_object_id,
188 compaction_catalog_agent_ref,
189 table_stats: Default::default(),
190 last_table_stats: Default::default(),
191 epoch_set: BTreeSet::default(),
192 memory_limiter,
193 block_size_vec: Vec::new(),
194 }
195 }
196
197 pub async fn add_for_test(
199 &mut self,
200 full_key: FullKey<&[u8]>,
201 value: HummockValue<&[u8]>,
202 ) -> HummockResult<()> {
203 self.add(full_key, value).await
204 }
205
206 pub fn current_block_size(&self) -> usize {
207 self.block_builder.approximate_len()
208 }
209
210 pub async fn add_raw_block(
212 &mut self,
213 buf: Bytes,
214 filter_data: Vec<u8>,
215 smallest_key: FullKey<Vec<u8>>,
216 largest_key: Vec<u8>,
217 mut meta: BlockMeta,
218 ) -> HummockResult<bool> {
219 let table_id = smallest_key.user_key.table_id.table_id;
220 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
221 if !self.block_builder.is_empty() {
222 self.build_block().await?;
224 }
225
226 self.table_ids.insert(table_id);
227 self.finalize_last_table_stats();
228 self.last_table_id = Some(table_id);
229 }
230
231 if !self.block_builder.is_empty() {
232 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
233
234 if self.block_builder.approximate_len() < min_block_size {
236 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
237 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
238 iter.seek_to_first();
239 while iter.is_valid() {
240 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
241 panic!(
242 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
243 self.sst_object_id, self.block_metas.len(), self.last_table_id
244 )
245 });
246 self.add_impl(iter.key(), value, false).await?;
247 iter.next();
248 }
249 return Ok(false);
250 }
251
252 self.build_block().await?;
253 }
254 self.last_full_key = largest_key;
255 assert_eq!(
256 meta.len as usize,
257 buf.len(),
258 "meta {} buf {} last_table_id {:?}",
259 meta.len,
260 buf.len(),
261 self.last_table_id
262 );
263 meta.offset = self.writer.data_len() as u32;
264 self.block_metas.push(meta);
265 self.filter_builder.add_raw_data(filter_data);
266 let block_meta = self.block_metas.last_mut().unwrap();
267 self.writer.write_block_bytes(buf, block_meta).await?;
268
269 Ok(true)
270 }
271
272 pub async fn add(
274 &mut self,
275 full_key: FullKey<&[u8]>,
276 value: HummockValue<&[u8]>,
277 ) -> HummockResult<()> {
278 self.add_impl(full_key, value, true).await
279 }
280
281 async fn add_impl(
283 &mut self,
284 full_key: FullKey<&[u8]>,
285 value: HummockValue<&[u8]>,
286 could_switch_block: bool,
287 ) -> HummockResult<()> {
288 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
289
290 let table_key_len = full_key.user_key.table_key.as_ref().len();
291 let table_value_len = match &value {
292 HummockValue::Put(t) => t.len(),
293 HummockValue::Delete => 0,
294 };
295 let large_value_len = self.options.max_sst_size as usize / 10;
296 let large_key_value_len = self.options.max_sst_size as usize / 2;
297 if table_key_len >= LARGE_KEY_LEN
298 || table_value_len > large_value_len
299 || table_key_len + table_value_len > large_key_value_len
300 {
301 let table_id = full_key.user_key.table_id.table_id();
302 tracing::warn!(
303 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
304 table_id,
305 table_key_len,
306 table_value_len,
307 full_key.epoch_with_gap.pure_epoch(),
308 full_key.epoch_with_gap.offset(),
309 );
310 }
311
312 full_key.encode_into(&mut self.raw_key);
314 value.encode(&mut self.raw_value);
315 let is_new_user_key = self.last_full_key.is_empty()
316 || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
317 let table_id = full_key.user_key.table_id.table_id();
318 let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
319 let current_block_size = self.current_block_size();
320 let is_block_full = current_block_size >= self.options.block_capacity
321 || (current_block_size > self.options.block_capacity / 4 * 3
322 && current_block_size + self.raw_value.len() + self.raw_key.len()
323 > self.options.block_capacity);
324
325 if is_new_table {
326 assert!(
327 could_switch_block,
328 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
329 is_new_user_key,
330 self.sst_object_id,
331 self.block_metas.len(),
332 table_id,
333 self.last_table_id,
334 full_key
335 );
336 self.table_ids.insert(table_id);
337 self.finalize_last_table_stats();
338 self.last_table_id = Some(table_id);
339 if !self.block_builder.is_empty() {
340 self.build_block().await?;
341 }
342 } else if is_block_full && could_switch_block {
343 self.build_block().await?;
344 }
345 self.last_table_stats.total_key_count += 1;
346 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
347
348 if self.block_builder.is_empty() {
350 self.block_metas.push(BlockMeta {
351 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
352 panic!(
353 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
354 self.writer.data_len(),
355 self.sst_object_id,
356 self.block_metas.len(),
357 self.table_ids,
358 )
359 }),
360 len: 0,
361 smallest_key: full_key.encode(),
362 uncompressed_size: 0,
363 total_key_count: 0,
364 stale_key_count: 0,
365 });
366 }
367
368 let table_id = full_key.user_key.table_id.table_id();
369 let mut extract_key = user_key(&self.raw_key);
370 extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
371 if !extract_key.is_empty() {
373 self.filter_builder.add_key(extract_key, table_id);
374 }
375 self.block_builder.add(full_key, self.raw_value.as_ref());
376 self.block_metas.last_mut().unwrap().total_key_count += 1;
377 if !is_new_user_key || value.is_delete() {
378 self.block_metas.last_mut().unwrap().stale_key_count += 1;
379 }
380 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
381 self.last_table_stats.total_value_size += value.encoded_len() as i64;
382
383 self.last_full_key.clear();
384 self.last_full_key.extend_from_slice(&self.raw_key);
385
386 self.raw_key.clear();
387 self.raw_value.clear();
388 Ok(())
389 }
390
391 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
401 let smallest_key = if self.block_metas.is_empty() {
402 vec![]
403 } else {
404 self.block_metas[0].smallest_key.clone()
405 };
406 let largest_key = self.last_full_key.clone();
407 self.finalize_last_table_stats();
408
409 self.build_block().await?;
410 let right_exclusive = false;
411 let meta_offset = self.writer.data_len() as u64;
412
413 let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
414 BloomFilterType::Blocked
415 } else {
416 BloomFilterType::Sstable
417 };
418 let bloom_filter = if self.options.bloom_false_positive > 0.0 {
419 self.filter_builder.finish(self.memory_limiter.clone())
420 } else {
421 vec![]
422 };
423
424 let total_key_count = self
425 .block_metas
426 .iter()
427 .map(|block_meta| block_meta.total_key_count as u64)
428 .sum::<u64>();
429 let stale_key_count = self
430 .block_metas
431 .iter()
432 .map(|block_meta| block_meta.stale_key_count as u64)
433 .sum::<u64>();
434 let uncompressed_file_size = self
435 .block_metas
436 .iter()
437 .map(|block_meta| block_meta.uncompressed_size as u64)
438 .sum::<u64>();
439
440 #[expect(deprecated)]
441 let mut meta = SstableMeta {
442 block_metas: self.block_metas,
443 bloom_filter,
444 estimated_size: 0,
445 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
446 panic!(
447 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
448 total_key_count, self.table_ids,
449 )
450 }),
451 smallest_key,
452 largest_key,
453 version: VERSION,
454 meta_offset,
455 monotonic_tombstone_events: vec![],
456 };
457
458 let meta_encode_size = meta.encoded_size();
459 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
460 panic!(
461 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
462 meta_encode_size, self.table_ids,
463 )
464 });
465 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
466 panic!(
467 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
468 meta_offset, self.table_ids,
469 )
470 });
471 meta.estimated_size = encoded_size_u32
472 .checked_add(meta_offset_u32)
473 .unwrap_or_else(|| {
474 panic!(
475 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
476 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
477 )
478 });
479
480 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
481 (0, 0)
482 } else {
483 let total_key_count: usize = self
484 .table_stats
485 .values()
486 .map(|s| s.total_key_count as usize)
487 .sum();
488
489 if total_key_count == 0 {
490 (0, 0)
491 } else {
492 let total_key_size: usize = self
493 .table_stats
494 .values()
495 .map(|s| s.total_key_size as usize)
496 .sum();
497
498 let total_value_size: usize = self
499 .table_stats
500 .values()
501 .map(|s| s.total_value_size as usize)
502 .sum();
503
504 (
505 total_key_size / total_key_count,
506 total_value_size / total_key_count,
507 )
508 }
509 };
510
511 let (min_epoch, max_epoch) = {
512 if self.epoch_set.is_empty() {
513 (HummockEpoch::MAX, u64::MIN)
514 } else {
515 (
516 *self.epoch_set.first().unwrap(),
517 *self.epoch_set.last().unwrap(),
518 )
519 }
520 };
521
522 let sst_info: SstableInfo = SstableInfoInner {
523 object_id: self.sst_object_id,
524 sst_id: self.sst_object_id.inner().into(),
526 bloom_filter_kind,
527 key_range: KeyRange {
528 left: Bytes::from(meta.smallest_key.clone()),
529 right: Bytes::from(meta.largest_key.clone()),
530 right_exclusive,
531 },
532 file_size: meta.estimated_size as u64,
533 table_ids: self.table_ids.into_iter().collect(),
534 meta_offset: meta.meta_offset,
535 stale_key_count,
536 total_key_count,
537 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
538 min_epoch,
539 max_epoch,
540 range_tombstone_count: 0,
541 sst_size: meta.estimated_size as u64,
542 }
543 .into();
544 tracing::trace!(
545 "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
546 meta.encoded_size(),
547 meta.bloom_filter.len(),
548 total_key_count,
549 stale_key_count,
550 min_epoch,
551 max_epoch,
552 self.epoch_set.len()
553 );
554 let bloom_filter_size = meta.bloom_filter.len();
555 let sstable_file_size = sst_info.file_size as usize;
556
557 if !meta.block_metas.is_empty() {
558 let mut last_table_id = meta.block_metas[0].table_id().table_id();
560 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
561 for block_meta in &meta.block_metas {
562 let block_table_id = block_meta.table_id();
563 if last_table_id != block_table_id.table_id() {
564 last_table_id = block_table_id.table_id();
565 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
566 }
567
568 last_table_stats.total_compressed_size += block_meta.len as u64;
569 }
570 }
571
572 let writer_output = self.writer.finish(meta).await?;
573 let now = SystemTime::now()
582 .duration_since(SystemTime::UNIX_EPOCH)
583 .expect("Clock may have gone backwards")
584 .as_secs();
585 Ok(SstableBuilderOutput::<W::Output> {
586 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
587 writer_output,
588 stats: SstableBuilderOutputStats {
589 bloom_filter_size,
590 avg_key_size,
591 avg_value_size,
592 epoch_count: self.epoch_set.len(),
593 block_size_vec: self.block_size_vec,
594 sstable_file_size,
595 },
596 })
597 }
598
599 pub fn approximate_len(&self) -> usize {
600 self.writer.data_len()
601 + self.block_builder.approximate_len()
602 + self.filter_builder.approximate_len()
603 }
604
605 pub async fn build_block(&mut self) -> HummockResult<()> {
606 if self.block_builder.is_empty() {
608 return Ok(());
609 }
610
611 let block_meta = self.block_metas.last_mut().unwrap();
612 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
613 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
614 .unwrap_or_else(|_| {
615 panic!(
616 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
617 uncompressed_block_size,
618 self.block_builder.table_id(),
619 )
620 });
621 let block = self.block_builder.build();
622 self.writer.write_block(block, block_meta).await?;
623 self.block_size_vec.push(block.len());
624 self.filter_builder
625 .switch_block(self.memory_limiter.clone());
626 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
627 panic!(
628 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
629 self.writer.data_len(),
630 self.block_builder.table_id(),
631 )
632 });
633 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
634 panic!(
635 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
636 data_len, block_meta.offset
637 )
638 });
639
640 if data_len as usize > self.options.capacity * 2 {
641 tracing::warn!(
642 "WARN unexpected block size {} table {:?}",
643 data_len,
644 self.block_builder.table_id()
645 );
646 }
647
648 self.block_builder.clear();
649 Ok(())
650 }
651
652 pub fn is_empty(&self) -> bool {
653 self.writer.data_len() > 0
654 }
655
656 pub fn reach_capacity(&self) -> bool {
658 self.approximate_len() >= self.options.capacity
659 }
660
661 fn finalize_last_table_stats(&mut self) {
662 if self.table_ids.is_empty() || self.last_table_id.is_none() {
663 return;
664 }
665 self.table_stats.insert(
666 self.last_table_id.unwrap(),
667 std::mem::take(&mut self.last_table_stats),
668 );
669 }
670}
671
672pub struct SstableBuilderOutputStats {
673 bloom_filter_size: usize,
674 avg_key_size: usize,
675 avg_value_size: usize,
676 epoch_count: usize,
677 block_size_vec: Vec<usize>, sstable_file_size: usize,
679}
680
681impl SstableBuilderOutputStats {
682 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
683 if self.bloom_filter_size != 0 {
684 metrics
685 .sstable_bloom_filter_size
686 .observe(self.bloom_filter_size as _);
687 }
688
689 if self.sstable_file_size != 0 {
690 metrics
691 .sstable_file_size
692 .observe(self.sstable_file_size as _);
693 }
694
695 if self.avg_key_size != 0 {
696 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
697 }
698
699 if self.avg_value_size != 0 {
700 metrics
701 .sstable_avg_value_size
702 .observe(self.avg_value_size as _);
703 }
704
705 if self.epoch_count != 0 {
706 metrics
707 .sstable_distinct_epoch_count
708 .observe(self.epoch_count as _);
709 }
710
711 if !self.block_size_vec.is_empty() {
712 for block_size in &self.block_size_vec {
713 metrics.sstable_block_size.observe(*block_size as _);
714 }
715 }
716 }
717}
718
719#[cfg(test)]
720pub(super) mod tests {
721 use std::collections::{Bound, HashMap};
722
723 use risingwave_common::catalog::TableId;
724 use risingwave_common::hash::VirtualNode;
725 use risingwave_common::util::epoch::test_epoch;
726 use risingwave_hummock_sdk::key::UserKey;
727
728 use super::*;
729 use crate::assert_bytes_eq;
730 use crate::compaction_catalog_manager::{
731 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
732 };
733 use crate::hummock::iterator::test_utils::mock_sstable_store;
734 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
735 use crate::hummock::test_utils::{
736 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
737 test_key_of, test_value_of,
738 };
739 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
740 use crate::monitor::StoreLocalStatistic;
741
742 #[tokio::test]
743 async fn test_empty() {
744 let opt = SstableBuilderOptions {
745 capacity: 0,
746 block_capacity: 4096,
747 restart_interval: 16,
748 bloom_false_positive: 0.001,
749 ..Default::default()
750 };
751
752 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
753 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
754 let b = SstableBuilder::for_test(
755 0,
756 mock_sst_writer(&opt),
757 opt,
758 table_id_to_vnode,
759 table_id_to_watermark_serde,
760 );
761
762 b.finish().await.unwrap();
763 }
764
765 #[tokio::test]
766 async fn test_basic() {
767 let opt = default_builder_opt_for_test();
768
769 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
770 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
771 let mut b = SstableBuilder::for_test(
772 0,
773 mock_sst_writer(&opt),
774 opt,
775 table_id_to_vnode,
776 table_id_to_watermark_serde,
777 );
778
779 for i in 0..TEST_KEYS_COUNT {
780 b.add_for_test(
781 test_key_of(i).to_ref(),
782 HummockValue::put(&test_value_of(i)),
783 )
784 .await
785 .unwrap();
786 }
787
788 let output = b.finish().await.unwrap();
789 let info = output.sst_info.sst_info;
790
791 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
792 assert_bytes_eq!(
793 test_key_of(TEST_KEYS_COUNT - 1).encode(),
794 info.key_range.right
795 );
796 let (data, meta) = output.writer_output;
797 assert_eq!(info.file_size, meta.estimated_size as u64);
798 let offset = info.meta_offset as usize;
799 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
800 assert_eq!(meta2, meta);
801 }
802
803 async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
804 let key_count = 1000;
805
806 let opts = SstableBuilderOptions {
807 capacity: 0,
808 block_capacity: 4096,
809 restart_interval: 16,
810 bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
811 ..Default::default()
812 };
813
814 let sstable_store = mock_sstable_store().await;
816 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
817 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
818 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
819 opts,
820 0,
821 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
822 sstable_store.clone(),
823 CachePolicy::NotFill,
824 table_id_to_vnode,
825 table_id_to_watermark_serde,
826 )
827 .await;
828 let table = sstable_store
829 .sstable(&sst_info, &mut StoreLocalStatistic::default())
830 .await
831 .unwrap();
832
833 assert_eq!(table.has_bloom_filter(), with_blooms);
834 for i in 0..key_count {
835 let full_key = test_key_of(i);
836 if table.has_bloom_filter() {
837 let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
838 let key_ref = full_key.user_key.as_ref();
839 assert!(
840 table.may_match_hash(
841 &(Bound::Included(key_ref), Bound::Included(key_ref)),
842 hash
843 ),
844 "failed at {}",
845 i
846 );
847 }
848 }
849 }
850
851 #[tokio::test]
852 async fn test_bloom_filter() {
853 test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
854 test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
855 test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
856 test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
857 }
858
859 #[tokio::test]
860 async fn test_no_bloom_filter_block() {
861 let opts = SstableBuilderOptions::default();
862 let sstable_store = mock_sstable_store().await;
864 let writer_opts = SstableWriterOptions::default();
865 let object_id = 1;
866 let writer = sstable_store
867 .clone()
868 .create_sst_writer(object_id, writer_opts);
869 let mut filter = MultiFilterKeyExtractor::default();
870 filter.register(1, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
871 filter.register(
872 2,
873 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
874 );
875 filter.register(3, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
876
877 let table_id_to_vnode = HashMap::from_iter(vec![
878 (1, VirtualNode::COUNT_FOR_TEST),
879 (2, VirtualNode::COUNT_FOR_TEST),
880 (3, VirtualNode::COUNT_FOR_TEST),
881 ]);
882 let table_id_to_watermark_serde = HashMap::from_iter(vec![(1, None), (2, None), (3, None)]);
883
884 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
885 FilterKeyExtractorImpl::Multi(filter),
886 table_id_to_vnode,
887 table_id_to_watermark_serde,
888 ));
889
890 let mut builder = SstableBuilder::new(
891 object_id,
892 writer,
893 BlockedXor16FilterBuilder::new(1024),
894 opts,
895 compaction_catalog_agent_ref,
896 None,
897 );
898
899 let key_count: usize = 10000;
900 for table_id in 1..4 {
901 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
902 for idx in 0..key_count {
903 table_key.resize(VirtualNode::SIZE, 0);
904 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
905 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
906 let v = test_value_of(idx);
907 builder
908 .add(
909 FullKey::from_user_key(k, test_epoch(1)),
910 HummockValue::put(v.as_ref()),
911 )
912 .await
913 .unwrap();
914 }
915 }
916 let ret = builder.finish().await.unwrap();
917 let sst_info = ret.sst_info.sst_info.clone();
918 ret.writer_output.await.unwrap().unwrap();
919 let table = sstable_store
920 .sstable(&sst_info, &mut StoreLocalStatistic::default())
921 .await
922 .unwrap();
923 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
924 for idx in 0..key_count {
925 table_key.resize(VirtualNode::SIZE, 0);
926 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
927 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
928 let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
929 let key_ref = k.as_ref();
930 assert!(
931 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
932 );
933 }
934 }
935}