1use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::row_serde::OrderedRowSerde;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36 FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53 pub capacity: usize,
55 pub block_capacity: usize,
57 pub restart_interval: usize,
59 pub bloom_false_positive: f64,
61 pub compression_algorithm: CompressionAlgorithm,
63 pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67 fn from(options: &StorageOpts) -> SstableBuilderOptions {
68 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69 SstableBuilderOptions {
70 capacity,
71 block_capacity: (options.block_size_kb as usize) * (1 << 10),
72 restart_interval: DEFAULT_RESTART_INTERVAL,
73 bloom_false_positive: options.bloom_false_positive,
74 compression_algorithm: CompressionAlgorithm::None,
75 max_sst_size: options.compactor_max_sst_size,
76 }
77 }
78}
79
80impl Default for SstableBuilderOptions {
81 fn default() -> Self {
82 Self {
83 capacity: DEFAULT_SSTABLE_SIZE,
84 block_capacity: DEFAULT_BLOCK_SIZE,
85 restart_interval: DEFAULT_RESTART_INTERVAL,
86 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87 compression_algorithm: CompressionAlgorithm::None,
88 max_sst_size: DEFAULT_MAX_SST_SIZE,
89 }
90 }
91}
92
93pub struct SstableBuilderOutput<WO> {
94 pub sst_info: LocalSstableInfo,
95 pub writer_output: WO,
96 pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100 options: SstableBuilderOptions,
102 writer: W,
104 block_builder: BlockBuilder,
106
107 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108 block_metas: Vec<BlockMeta>,
110
111 table_ids: BTreeSet<TableId>,
113 last_full_key: Vec<u8>,
114 raw_key: BytesMut,
116 raw_value: BytesMut,
117 last_table_id: Option<TableId>,
118 sst_object_id: HummockSstableObjectId,
119
120 table_stats: TableStatsMap,
122 last_table_stats: TableStats,
125
126 filter_builder: F,
127
128 epoch_set: BTreeSet<u64>,
129 memory_limiter: Option<Arc<MemoryLimiter>>,
130
131 block_size_vec: Vec<usize>, }
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135 pub fn for_test(
136 sstable_id: u64,
137 writer: W,
138 options: SstableBuilderOptions,
139 table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
140 table_id_to_watermark_serde: HashMap<
141 impl Into<TableId>,
142 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143 >,
144 ) -> Self {
145 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147 table_id_to_vnode
148 .into_iter()
149 .map(|(table_id, v)| (table_id.into(), v))
150 .collect(),
151 table_id_to_watermark_serde
152 .into_iter()
153 .map(|(table_id, v)| (table_id.into(), v))
154 .collect(),
155 ));
156
157 Self::new(
158 sstable_id,
159 writer,
160 Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
161 options,
162 compaction_catalog_agent_ref,
163 None,
164 )
165 }
166}
167
168impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
169 pub fn new(
170 sst_object_id: impl Into<HummockSstableObjectId>,
171 writer: W,
172 filter_builder: F,
173 options: SstableBuilderOptions,
174 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
175 memory_limiter: Option<Arc<MemoryLimiter>>,
176 ) -> Self {
177 let sst_object_id = sst_object_id.into();
178 Self {
179 options: options.clone(),
180 writer,
181 block_builder: BlockBuilder::new(BlockBuilderOptions {
182 capacity: options.block_capacity,
183 restart_interval: options.restart_interval,
184 compression_algorithm: options.compression_algorithm,
185 }),
186 filter_builder,
187 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
188 table_ids: BTreeSet::new(),
189 last_table_id: None,
190 raw_key: BytesMut::new(),
191 raw_value: BytesMut::new(),
192 last_full_key: vec![],
193 sst_object_id,
194 compaction_catalog_agent_ref,
195 table_stats: Default::default(),
196 last_table_stats: Default::default(),
197 epoch_set: BTreeSet::default(),
198 memory_limiter,
199 block_size_vec: Vec::new(),
200 }
201 }
202
203 pub async fn add_for_test(
205 &mut self,
206 full_key: FullKey<&[u8]>,
207 value: HummockValue<&[u8]>,
208 ) -> HummockResult<()> {
209 self.add(full_key, value).await
210 }
211
212 pub fn current_block_size(&self) -> usize {
213 self.block_builder.approximate_len()
214 }
215
216 pub async fn add_raw_block(
218 &mut self,
219 buf: Bytes,
220 filter_data: Vec<u8>,
221 smallest_key: FullKey<Vec<u8>>,
222 largest_key: Vec<u8>,
223 mut meta: BlockMeta,
224 ) -> HummockResult<bool> {
225 let table_id = smallest_key.user_key.table_id;
226 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
227 if !self.block_builder.is_empty() {
228 self.build_block().await?;
230 }
231
232 self.table_ids.insert(table_id);
233 self.finalize_last_table_stats();
234 self.last_table_id = Some(table_id);
235 }
236
237 if !self.block_builder.is_empty() {
238 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
239
240 if self.block_builder.approximate_len() < min_block_size {
242 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
243 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
244 iter.seek_to_first();
245 while iter.is_valid() {
246 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
247 panic!(
248 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
249 self.sst_object_id, self.block_metas.len(), self.last_table_id
250 )
251 });
252 self.add_impl(iter.key(), value, false).await?;
253 iter.next();
254 }
255 return Ok(false);
256 }
257
258 self.build_block().await?;
259 }
260 self.last_full_key = largest_key;
261 assert_eq!(
262 meta.len as usize,
263 buf.len(),
264 "meta {} buf {} last_table_id {:?}",
265 meta.len,
266 buf.len(),
267 self.last_table_id
268 );
269 meta.offset = self.writer.data_len() as u32;
270 self.block_metas.push(meta);
271 self.filter_builder.add_raw_data(filter_data);
272 let block_meta = self.block_metas.last_mut().unwrap();
273 self.writer.write_block_bytes(buf, block_meta).await?;
274
275 Ok(true)
276 }
277
278 pub async fn add(
280 &mut self,
281 full_key: FullKey<&[u8]>,
282 value: HummockValue<&[u8]>,
283 ) -> HummockResult<()> {
284 self.add_impl(full_key, value, true).await
285 }
286
287 async fn add_impl(
289 &mut self,
290 full_key: FullKey<&[u8]>,
291 value: HummockValue<&[u8]>,
292 could_switch_block: bool,
293 ) -> HummockResult<()> {
294 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
295
296 let table_key_len = full_key.user_key.table_key.as_ref().len();
297 let table_value_len = match &value {
298 HummockValue::Put(t) => t.len(),
299 HummockValue::Delete => 0,
300 };
301 let large_value_len = self.options.max_sst_size as usize / 10;
302 let large_key_value_len = self.options.max_sst_size as usize / 2;
303 if table_key_len >= LARGE_KEY_LEN
304 || table_value_len > large_value_len
305 || table_key_len + table_value_len > large_key_value_len
306 {
307 let table_id = full_key.user_key.table_id;
308 tracing::warn!(
309 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
310 table_id,
311 table_key_len,
312 table_value_len,
313 full_key.epoch_with_gap.pure_epoch(),
314 full_key.epoch_with_gap.offset(),
315 );
316 }
317
318 full_key.encode_into(&mut self.raw_key);
320 value.encode(&mut self.raw_value);
321 let is_new_user_key = self.last_full_key.is_empty()
322 || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
323 let table_id = full_key.user_key.table_id;
324 let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
325 let current_block_size = self.current_block_size();
326 let is_block_full = current_block_size >= self.options.block_capacity
327 || (current_block_size > self.options.block_capacity / 4 * 3
328 && current_block_size + self.raw_value.len() + self.raw_key.len()
329 > self.options.block_capacity);
330
331 if is_new_table {
332 assert!(
333 could_switch_block,
334 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
335 is_new_user_key,
336 self.sst_object_id,
337 self.block_metas.len(),
338 table_id,
339 self.last_table_id,
340 full_key
341 );
342 self.table_ids.insert(table_id);
343 self.finalize_last_table_stats();
344 self.last_table_id = Some(table_id);
345 if !self.block_builder.is_empty() {
346 self.build_block().await?;
347 }
348 } else if is_block_full && could_switch_block {
349 self.build_block().await?;
350 }
351 self.last_table_stats.total_key_count += 1;
352 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
353
354 if self.block_builder.is_empty() {
356 self.block_metas.push(BlockMeta {
357 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
358 panic!(
359 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
360 self.writer.data_len(),
361 self.sst_object_id,
362 self.block_metas.len(),
363 self.table_ids,
364 )
365 }),
366 len: 0,
367 smallest_key: full_key.encode(),
368 uncompressed_size: 0,
369 total_key_count: 0,
370 stale_key_count: 0,
371 });
372 }
373
374 let table_id = full_key.user_key.table_id;
375 let mut extract_key = user_key(&self.raw_key);
376 extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
377 if !extract_key.is_empty() {
379 self.filter_builder
380 .add_key(extract_key, table_id.as_raw_id());
381 }
382 self.block_builder.add(full_key, self.raw_value.as_ref());
383 self.block_metas.last_mut().unwrap().total_key_count += 1;
384 if !is_new_user_key || value.is_delete() {
385 self.block_metas.last_mut().unwrap().stale_key_count += 1;
386 }
387 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
388 self.last_table_stats.total_value_size += value.encoded_len() as i64;
389
390 self.last_full_key.clear();
391 self.last_full_key.extend_from_slice(&self.raw_key);
392
393 self.raw_key.clear();
394 self.raw_value.clear();
395 Ok(())
396 }
397
398 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
408 let smallest_key = if self.block_metas.is_empty() {
409 vec![]
410 } else {
411 self.block_metas[0].smallest_key.clone()
412 };
413 let largest_key = self.last_full_key.clone();
414 self.finalize_last_table_stats();
415
416 self.build_block().await?;
417 let right_exclusive = false;
418 let meta_offset = self.writer.data_len() as u64;
419
420 let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
421 BloomFilterType::Blocked
422 } else {
423 BloomFilterType::Sstable
424 };
425 let bloom_filter = if self.options.bloom_false_positive > 0.0 {
426 self.filter_builder.finish(self.memory_limiter.clone())
427 } else {
428 vec![]
429 };
430
431 let total_key_count = self
432 .block_metas
433 .iter()
434 .map(|block_meta| block_meta.total_key_count as u64)
435 .sum::<u64>();
436 let stale_key_count = self
437 .block_metas
438 .iter()
439 .map(|block_meta| block_meta.stale_key_count as u64)
440 .sum::<u64>();
441 let uncompressed_file_size = self
442 .block_metas
443 .iter()
444 .map(|block_meta| block_meta.uncompressed_size as u64)
445 .sum::<u64>();
446
447 #[expect(deprecated)]
448 let mut meta = SstableMeta {
449 block_metas: self.block_metas,
450 bloom_filter,
451 estimated_size: 0,
452 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
453 panic!(
454 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
455 total_key_count, self.table_ids,
456 )
457 }),
458 smallest_key,
459 largest_key,
460 version: VERSION,
461 meta_offset,
462 monotonic_tombstone_events: vec![],
463 };
464
465 let meta_encode_size = meta.encoded_size();
466 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
467 panic!(
468 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
469 meta_encode_size, self.table_ids,
470 )
471 });
472 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
473 panic!(
474 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
475 meta_offset, self.table_ids,
476 )
477 });
478 meta.estimated_size = encoded_size_u32
479 .checked_add(meta_offset_u32)
480 .unwrap_or_else(|| {
481 panic!(
482 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
483 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
484 )
485 });
486
487 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
488 (0, 0)
489 } else {
490 let total_key_count: usize = self
491 .table_stats
492 .values()
493 .map(|s| s.total_key_count as usize)
494 .sum();
495
496 if total_key_count == 0 {
497 (0, 0)
498 } else {
499 let total_key_size: usize = self
500 .table_stats
501 .values()
502 .map(|s| s.total_key_size as usize)
503 .sum();
504
505 let total_value_size: usize = self
506 .table_stats
507 .values()
508 .map(|s| s.total_value_size as usize)
509 .sum();
510
511 (
512 total_key_size / total_key_count,
513 total_value_size / total_key_count,
514 )
515 }
516 };
517
518 let (min_epoch, max_epoch) = {
519 if self.epoch_set.is_empty() {
520 (HummockEpoch::MAX, u64::MIN)
521 } else {
522 (
523 *self.epoch_set.first().unwrap(),
524 *self.epoch_set.last().unwrap(),
525 )
526 }
527 };
528
529 let sst_info: SstableInfo = SstableInfoInner {
530 object_id: self.sst_object_id,
531 sst_id: self.sst_object_id.inner().into(),
533 bloom_filter_kind,
534 key_range: KeyRange {
535 left: Bytes::from(meta.smallest_key.clone()),
536 right: Bytes::from(meta.largest_key.clone()),
537 right_exclusive,
538 },
539 file_size: meta.estimated_size as u64,
540 table_ids: self.table_ids.into_iter().collect(),
541 meta_offset: meta.meta_offset,
542 stale_key_count,
543 total_key_count,
544 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
545 min_epoch,
546 max_epoch,
547 range_tombstone_count: 0,
548 sst_size: meta.estimated_size as u64,
549 }
550 .into();
551 tracing::trace!(
552 "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
553 meta.encoded_size(),
554 meta.bloom_filter.len(),
555 total_key_count,
556 stale_key_count,
557 min_epoch,
558 max_epoch,
559 self.epoch_set.len()
560 );
561 let bloom_filter_size = meta.bloom_filter.len();
562 let sstable_file_size = sst_info.file_size as usize;
563
564 if !meta.block_metas.is_empty() {
565 let mut last_table_id = meta.block_metas[0].table_id();
567 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
568 for block_meta in &meta.block_metas {
569 let block_table_id = block_meta.table_id();
570 if last_table_id != block_table_id {
571 last_table_id = block_table_id;
572 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
573 }
574
575 last_table_stats.total_compressed_size += block_meta.len as u64;
576 }
577 }
578
579 let writer_output = self.writer.finish(meta).await?;
580 let now = SystemTime::now()
589 .duration_since(SystemTime::UNIX_EPOCH)
590 .expect("Clock may have gone backwards")
591 .as_secs();
592 Ok(SstableBuilderOutput::<W::Output> {
593 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
594 writer_output,
595 stats: SstableBuilderOutputStats {
596 bloom_filter_size,
597 avg_key_size,
598 avg_value_size,
599 epoch_count: self.epoch_set.len(),
600 block_size_vec: self.block_size_vec,
601 sstable_file_size,
602 },
603 })
604 }
605
606 pub fn approximate_len(&self) -> usize {
607 self.writer.data_len()
608 + self.block_builder.approximate_len()
609 + self.filter_builder.approximate_len()
610 }
611
612 pub async fn build_block(&mut self) -> HummockResult<()> {
613 if self.block_builder.is_empty() {
615 return Ok(());
616 }
617
618 let block_meta = self.block_metas.last_mut().unwrap();
619 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
620 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
621 .unwrap_or_else(|_| {
622 panic!(
623 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
624 uncompressed_block_size,
625 self.block_builder.table_id(),
626 )
627 });
628 let block = self.block_builder.build();
629 self.writer.write_block(block, block_meta).await?;
630 self.block_size_vec.push(block.len());
631 self.filter_builder
632 .switch_block(self.memory_limiter.clone());
633 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
634 panic!(
635 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
636 self.writer.data_len(),
637 self.block_builder.table_id(),
638 )
639 });
640 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
641 panic!(
642 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
643 data_len, block_meta.offset
644 )
645 });
646
647 if data_len as usize > self.options.capacity * 2 {
648 tracing::warn!(
649 "WARN unexpected block size {} table {:?}",
650 data_len,
651 self.block_builder.table_id()
652 );
653 }
654
655 self.block_builder.clear();
656 Ok(())
657 }
658
659 pub fn is_empty(&self) -> bool {
660 self.writer.data_len() > 0
661 }
662
663 pub fn reach_capacity(&self) -> bool {
665 self.approximate_len() >= self.options.capacity
666 }
667
668 fn finalize_last_table_stats(&mut self) {
669 if self.table_ids.is_empty() || self.last_table_id.is_none() {
670 return;
671 }
672 self.table_stats.insert(
673 self.last_table_id.unwrap(),
674 std::mem::take(&mut self.last_table_stats),
675 );
676 }
677}
678
679pub struct SstableBuilderOutputStats {
680 bloom_filter_size: usize,
681 avg_key_size: usize,
682 avg_value_size: usize,
683 epoch_count: usize,
684 block_size_vec: Vec<usize>, sstable_file_size: usize,
686}
687
688impl SstableBuilderOutputStats {
689 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
690 if self.bloom_filter_size != 0 {
691 metrics
692 .sstable_bloom_filter_size
693 .observe(self.bloom_filter_size as _);
694 }
695
696 if self.sstable_file_size != 0 {
697 metrics
698 .sstable_file_size
699 .observe(self.sstable_file_size as _);
700 }
701
702 if self.avg_key_size != 0 {
703 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
704 }
705
706 if self.avg_value_size != 0 {
707 metrics
708 .sstable_avg_value_size
709 .observe(self.avg_value_size as _);
710 }
711
712 if self.epoch_count != 0 {
713 metrics
714 .sstable_distinct_epoch_count
715 .observe(self.epoch_count as _);
716 }
717
718 if !self.block_size_vec.is_empty() {
719 for block_size in &self.block_size_vec {
720 metrics.sstable_block_size.observe(*block_size as _);
721 }
722 }
723 }
724}
725
726#[cfg(test)]
727pub(super) mod tests {
728 use std::collections::{Bound, HashMap};
729
730 use risingwave_common::catalog::TableId;
731 use risingwave_common::hash::VirtualNode;
732 use risingwave_common::util::epoch::test_epoch;
733 use risingwave_hummock_sdk::key::UserKey;
734
735 use super::*;
736 use crate::assert_bytes_eq;
737 use crate::compaction_catalog_manager::{
738 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
739 };
740 use crate::hummock::iterator::test_utils::mock_sstable_store;
741 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
742 use crate::hummock::test_utils::{
743 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
744 test_key_of, test_value_of,
745 };
746 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
747 use crate::monitor::StoreLocalStatistic;
748
749 #[tokio::test]
750 async fn test_empty() {
751 let opt = SstableBuilderOptions {
752 capacity: 0,
753 block_capacity: 4096,
754 restart_interval: 16,
755 bloom_false_positive: 0.001,
756 ..Default::default()
757 };
758
759 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
760 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
761 let b = SstableBuilder::for_test(
762 0,
763 mock_sst_writer(&opt),
764 opt,
765 table_id_to_vnode,
766 table_id_to_watermark_serde,
767 );
768
769 b.finish().await.unwrap();
770 }
771
772 #[tokio::test]
773 async fn test_basic() {
774 let opt = default_builder_opt_for_test();
775
776 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
777 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
778 let mut b = SstableBuilder::for_test(
779 0,
780 mock_sst_writer(&opt),
781 opt,
782 table_id_to_vnode,
783 table_id_to_watermark_serde,
784 );
785
786 for i in 0..TEST_KEYS_COUNT {
787 b.add_for_test(
788 test_key_of(i).to_ref(),
789 HummockValue::put(&test_value_of(i)),
790 )
791 .await
792 .unwrap();
793 }
794
795 let output = b.finish().await.unwrap();
796 let info = output.sst_info.sst_info;
797
798 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
799 assert_bytes_eq!(
800 test_key_of(TEST_KEYS_COUNT - 1).encode(),
801 info.key_range.right
802 );
803 let (data, meta) = output.writer_output;
804 assert_eq!(info.file_size, meta.estimated_size as u64);
805 let offset = info.meta_offset as usize;
806 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
807 assert_eq!(meta2, meta);
808 }
809
810 async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
811 let key_count = 1000;
812
813 let opts = SstableBuilderOptions {
814 capacity: 0,
815 block_capacity: 4096,
816 restart_interval: 16,
817 bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
818 ..Default::default()
819 };
820
821 let sstable_store = mock_sstable_store().await;
823 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
824 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
825 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
826 opts,
827 0,
828 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
829 sstable_store.clone(),
830 CachePolicy::NotFill,
831 table_id_to_vnode,
832 table_id_to_watermark_serde,
833 )
834 .await;
835 let table = sstable_store
836 .sstable(&sst_info, &mut StoreLocalStatistic::default())
837 .await
838 .unwrap();
839
840 assert_eq!(table.has_bloom_filter(), with_blooms);
841 for i in 0..key_count {
842 let full_key = test_key_of(i);
843 if table.has_bloom_filter() {
844 let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
845 let key_ref = full_key.user_key.as_ref();
846 assert!(
847 table.may_match_hash(
848 &(Bound::Included(key_ref), Bound::Included(key_ref)),
849 hash
850 ),
851 "failed at {}",
852 i
853 );
854 }
855 }
856 }
857
858 #[tokio::test]
859 async fn test_bloom_filter() {
860 test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
861 test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
862 test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
863 test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
864 }
865
866 #[tokio::test]
867 async fn test_no_bloom_filter_block() {
868 let opts = SstableBuilderOptions::default();
869 let sstable_store = mock_sstable_store().await;
871 let writer_opts = SstableWriterOptions::default();
872 let object_id = 1;
873 let writer = sstable_store
874 .clone()
875 .create_sst_writer(object_id, writer_opts);
876 let mut filter = MultiFilterKeyExtractor::default();
877 filter.register(
878 1.into(),
879 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
880 );
881 filter.register(
882 2.into(),
883 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
884 );
885 filter.register(
886 3.into(),
887 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
888 );
889
890 let table_id_to_vnode = HashMap::from_iter(vec![
891 (1.into(), VirtualNode::COUNT_FOR_TEST),
892 (2.into(), VirtualNode::COUNT_FOR_TEST),
893 (3.into(), VirtualNode::COUNT_FOR_TEST),
894 ]);
895 let table_id_to_watermark_serde =
896 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
897
898 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
899 FilterKeyExtractorImpl::Multi(filter),
900 table_id_to_vnode,
901 table_id_to_watermark_serde,
902 ));
903
904 let mut builder = SstableBuilder::new(
905 object_id,
906 writer,
907 BlockedXor16FilterBuilder::new(1024),
908 opts,
909 compaction_catalog_agent_ref,
910 None,
911 );
912
913 let key_count: usize = 10000;
914 for table_id in 1..4 {
915 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
916 for idx in 0..key_count {
917 table_key.resize(VirtualNode::SIZE, 0);
918 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
919 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
920 let v = test_value_of(idx);
921 builder
922 .add(
923 FullKey::from_user_key(k, test_epoch(1)),
924 HummockValue::put(v.as_ref()),
925 )
926 .await
927 .unwrap();
928 }
929 }
930 let ret = builder.finish().await.unwrap();
931 let sst_info = ret.sst_info.sst_info.clone();
932 ret.writer_output.await.unwrap().unwrap();
933 let table = sstable_store
934 .sstable(&sst_info, &mut StoreLocalStatistic::default())
935 .await
936 .unwrap();
937 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
938 for idx in 0..key_count {
939 table_key.resize(VirtualNode::SIZE, 0);
940 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
941 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
942 let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
943 let key_ref = k.as_ref();
944 assert!(
945 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
946 );
947 }
948 }
949}