1use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::row_serde::OrderedRowSerde;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36 FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42 try_shorten_block_smallest_key,
43};
44use crate::monitor::CompactorMetrics;
45use crate::opts::StorageOpts;
46
47pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
48pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
49pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
50pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
51
52#[derive(Clone, Debug)]
53pub struct SstableBuilderOptions {
54 pub capacity: usize,
56 pub block_capacity: usize,
58 pub restart_interval: usize,
60 pub bloom_false_positive: f64,
62 pub compression_algorithm: CompressionAlgorithm,
64 pub max_sst_size: u64,
65 pub shorten_block_meta_key_threshold: Option<usize>,
67}
68
69impl From<&StorageOpts> for SstableBuilderOptions {
70 fn from(options: &StorageOpts) -> SstableBuilderOptions {
71 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
72 SstableBuilderOptions {
73 capacity,
74 block_capacity: (options.block_size_kb as usize) * (1 << 10),
75 restart_interval: DEFAULT_RESTART_INTERVAL,
76 bloom_false_positive: options.bloom_false_positive,
77 compression_algorithm: CompressionAlgorithm::None,
78 max_sst_size: options.compactor_max_sst_size,
79 shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
80 }
81 }
82}
83
84impl Default for SstableBuilderOptions {
85 fn default() -> Self {
86 Self {
87 capacity: DEFAULT_SSTABLE_SIZE,
88 block_capacity: DEFAULT_BLOCK_SIZE,
89 restart_interval: DEFAULT_RESTART_INTERVAL,
90 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
91 compression_algorithm: CompressionAlgorithm::None,
92 max_sst_size: DEFAULT_MAX_SST_SIZE,
93 shorten_block_meta_key_threshold: None,
94 }
95 }
96}
97
98pub struct SstableBuilderOutput<WO> {
99 pub sst_info: LocalSstableInfo,
100 pub writer_output: WO,
101 pub stats: SstableBuilderOutputStats,
102}
103
104pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
105 options: SstableBuilderOptions,
107 writer: W,
109 block_builder: BlockBuilder,
111
112 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
113 block_metas: Vec<BlockMeta>,
115
116 table_ids: BTreeSet<TableId>,
118 last_full_key: Vec<u8>,
119 raw_key: BytesMut,
121 raw_value: BytesMut,
122 last_table_id: Option<TableId>,
123 sst_object_id: HummockSstableObjectId,
124
125 table_stats: TableStatsMap,
127 last_table_stats: TableStats,
130
131 filter_builder: F,
132
133 epoch_set: BTreeSet<u64>,
134 memory_limiter: Option<Arc<MemoryLimiter>>,
135
136 block_size_vec: Vec<usize>, }
138
139impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
140 pub fn for_test(
141 sstable_id: u64,
142 writer: W,
143 options: SstableBuilderOptions,
144 table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
145 table_id_to_watermark_serde: HashMap<
146 impl Into<TableId>,
147 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
148 >,
149 ) -> Self {
150 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
151 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
152 table_id_to_vnode
153 .into_iter()
154 .map(|(table_id, v)| (table_id.into(), v))
155 .collect(),
156 table_id_to_watermark_serde
157 .into_iter()
158 .map(|(table_id, v)| (table_id.into(), v))
159 .collect(),
160 HashMap::default(),
161 ));
162
163 Self::new(
164 sstable_id,
165 writer,
166 Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
167 options,
168 compaction_catalog_agent_ref,
169 None,
170 )
171 }
172}
173
174impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
175 pub fn new(
176 sst_object_id: impl Into<HummockSstableObjectId>,
177 writer: W,
178 filter_builder: F,
179 options: SstableBuilderOptions,
180 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
181 memory_limiter: Option<Arc<MemoryLimiter>>,
182 ) -> Self {
183 let sst_object_id = sst_object_id.into();
184 Self {
185 options: options.clone(),
186 writer,
187 block_builder: BlockBuilder::new(BlockBuilderOptions {
188 capacity: options.block_capacity,
189 restart_interval: options.restart_interval,
190 compression_algorithm: options.compression_algorithm,
191 }),
192 filter_builder,
193 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
194 table_ids: BTreeSet::new(),
195 last_table_id: None,
196 raw_key: BytesMut::new(),
197 raw_value: BytesMut::new(),
198 last_full_key: vec![],
199 sst_object_id,
200 compaction_catalog_agent_ref,
201 table_stats: Default::default(),
202 last_table_stats: Default::default(),
203 epoch_set: BTreeSet::default(),
204 memory_limiter,
205 block_size_vec: Vec::new(),
206 }
207 }
208
209 pub async fn add_for_test(
211 &mut self,
212 full_key: FullKey<&[u8]>,
213 value: HummockValue<&[u8]>,
214 ) -> HummockResult<()> {
215 self.add(full_key, value).await
216 }
217
218 pub fn current_block_size(&self) -> usize {
219 self.block_builder.approximate_len()
220 }
221
222 pub async fn add_raw_block(
224 &mut self,
225 buf: Bytes,
226 filter_data: Vec<u8>,
227 smallest_key: FullKey<Vec<u8>>,
228 largest_key: Vec<u8>,
229 mut meta: BlockMeta,
230 ) -> HummockResult<bool> {
231 let table_id = smallest_key.user_key.table_id;
232 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
233 if !self.block_builder.is_empty() {
234 self.build_block().await?;
236 }
237
238 self.table_ids.insert(table_id);
239 self.finalize_last_table_stats();
240 self.last_table_id = Some(table_id);
241 }
242
243 if !self.block_builder.is_empty() {
244 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
245
246 if self.block_builder.approximate_len() < min_block_size {
248 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
249 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
250 iter.seek_to_first();
251 while iter.is_valid() {
252 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
253 panic!(
254 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
255 self.sst_object_id, self.block_metas.len(), self.last_table_id
256 )
257 });
258 self.add_impl(iter.key(), value, false).await?;
259 iter.next();
260 }
261 return Ok(false);
262 }
263
264 self.build_block().await?;
265 }
266 self.last_full_key = largest_key;
267 assert_eq!(
268 meta.len as usize,
269 buf.len(),
270 "meta {} buf {} last_table_id {:?}",
271 meta.len,
272 buf.len(),
273 self.last_table_id
274 );
275 meta.offset = self.writer.data_len() as u32;
276 self.block_metas.push(meta);
277 self.filter_builder.add_raw_data(filter_data);
278 let block_meta = self.block_metas.last_mut().unwrap();
279 self.writer.write_block_bytes(buf, block_meta).await?;
280
281 Ok(true)
282 }
283
284 pub async fn add(
286 &mut self,
287 full_key: FullKey<&[u8]>,
288 value: HummockValue<&[u8]>,
289 ) -> HummockResult<()> {
290 self.add_impl(full_key, value, true).await
291 }
292
293 async fn add_impl(
295 &mut self,
296 full_key: FullKey<&[u8]>,
297 value: HummockValue<&[u8]>,
298 could_switch_block: bool,
299 ) -> HummockResult<()> {
300 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
301
302 let table_key_len = full_key.user_key.table_key.as_ref().len();
303 let table_value_len = match &value {
304 HummockValue::Put(t) => t.len(),
305 HummockValue::Delete => 0,
306 };
307 let large_value_len = self.options.max_sst_size as usize / 10;
308 let large_key_value_len = self.options.max_sst_size as usize / 2;
309 if table_key_len >= LARGE_KEY_LEN
310 || table_value_len > large_value_len
311 || table_key_len + table_value_len > large_key_value_len
312 {
313 let table_id = full_key.user_key.table_id;
314 tracing::warn!(
315 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
316 table_id,
317 table_key_len,
318 table_value_len,
319 full_key.epoch_with_gap.pure_epoch(),
320 full_key.epoch_with_gap.offset(),
321 );
322 }
323
324 full_key.encode_into(&mut self.raw_key);
326 value.encode(&mut self.raw_value);
327 let is_new_user_key = self.last_full_key.is_empty()
328 || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
329 let table_id = full_key.user_key.table_id;
330 let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
331 let current_block_size = self.current_block_size();
332 let is_block_full = current_block_size >= self.options.block_capacity
333 || (current_block_size > self.options.block_capacity / 4 * 3
334 && current_block_size + self.raw_value.len() + self.raw_key.len()
335 > self.options.block_capacity);
336
337 if is_new_table {
338 assert!(
339 could_switch_block,
340 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
341 is_new_user_key,
342 self.sst_object_id,
343 self.block_metas.len(),
344 table_id,
345 self.last_table_id,
346 full_key
347 );
348 self.table_ids.insert(table_id);
349 self.finalize_last_table_stats();
350 self.last_table_id = Some(table_id);
351 if !self.block_builder.is_empty() {
352 self.build_block().await?;
353 }
354 } else if is_block_full && could_switch_block {
355 self.build_block().await?;
356 }
357 self.last_table_stats.total_key_count += 1;
358 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
359
360 if self.block_builder.is_empty() {
362 let smallest_key = if let Some(threshold) =
363 self.options.shorten_block_meta_key_threshold
364 && !self.last_full_key.is_empty()
365 && full_key.encoded_len() >= threshold
366 {
367 let prev = FullKey::decode(&self.last_full_key);
368 if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
369 shortened.encode()
370 } else {
371 full_key.encode()
372 }
373 } else {
374 full_key.encode()
375 };
376
377 self.block_metas.push(BlockMeta {
378 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
379 panic!(
380 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
381 self.writer.data_len(),
382 self.sst_object_id,
383 self.block_metas.len(),
384 self.table_ids,
385 )
386 }),
387 len: 0,
388 smallest_key,
389 uncompressed_size: 0,
390 total_key_count: 0,
391 stale_key_count: 0,
392 });
393 }
394
395 let table_id = full_key.user_key.table_id;
396 let mut extract_key = user_key(&self.raw_key);
397 extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
398 if !extract_key.is_empty() {
400 self.filter_builder
401 .add_key(extract_key, table_id.as_raw_id());
402 }
403 self.block_builder.add(
405 table_id,
406 &self.raw_key[TABLE_PREFIX_LEN..],
407 self.raw_value.as_ref(),
408 );
409 self.block_metas.last_mut().unwrap().total_key_count += 1;
410 if !is_new_user_key || value.is_delete() {
411 self.block_metas.last_mut().unwrap().stale_key_count += 1;
412 }
413 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
414 self.last_table_stats.total_value_size += value.encoded_len() as i64;
415
416 self.last_full_key.clear();
417 self.last_full_key.extend_from_slice(&self.raw_key);
418
419 self.raw_key.clear();
420 self.raw_value.clear();
421 Ok(())
422 }
423
424 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
434 let smallest_key = if self.block_metas.is_empty() {
435 vec![]
436 } else {
437 self.block_metas[0].smallest_key.clone()
438 };
439 let largest_key = self.last_full_key.clone();
440 self.finalize_last_table_stats();
441
442 self.build_block().await?;
443 let right_exclusive = false;
444 let meta_offset = self.writer.data_len() as u64;
445
446 let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
447 BloomFilterType::Blocked
448 } else {
449 BloomFilterType::Sstable
450 };
451 let bloom_filter = if self.options.bloom_false_positive > 0.0 {
452 self.filter_builder.finish(self.memory_limiter.clone())
453 } else {
454 vec![]
455 };
456
457 let total_key_count = self
458 .block_metas
459 .iter()
460 .map(|block_meta| block_meta.total_key_count as u64)
461 .sum::<u64>();
462 let stale_key_count = self
463 .block_metas
464 .iter()
465 .map(|block_meta| block_meta.stale_key_count as u64)
466 .sum::<u64>();
467 let uncompressed_file_size = self
468 .block_metas
469 .iter()
470 .map(|block_meta| block_meta.uncompressed_size as u64)
471 .sum::<u64>();
472
473 #[expect(deprecated)]
474 let mut meta = SstableMeta {
475 block_metas: self.block_metas,
476 bloom_filter,
477 estimated_size: 0,
478 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
479 panic!(
480 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
481 total_key_count, self.table_ids,
482 )
483 }),
484 smallest_key,
485 largest_key,
486 version: VERSION,
487 meta_offset,
488 monotonic_tombstone_events: vec![],
489 };
490
491 let meta_encode_size = meta.encoded_size();
492 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
493 panic!(
494 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
495 meta_encode_size, self.table_ids,
496 )
497 });
498 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
499 panic!(
500 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
501 meta_offset, self.table_ids,
502 )
503 });
504 meta.estimated_size = encoded_size_u32
505 .checked_add(meta_offset_u32)
506 .unwrap_or_else(|| {
507 panic!(
508 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
509 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
510 )
511 });
512
513 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
514 (0, 0)
515 } else {
516 let total_key_count: usize = self
517 .table_stats
518 .values()
519 .map(|s| s.total_key_count as usize)
520 .sum();
521
522 if total_key_count == 0 {
523 (0, 0)
524 } else {
525 let total_key_size: usize = self
526 .table_stats
527 .values()
528 .map(|s| s.total_key_size as usize)
529 .sum();
530
531 let total_value_size: usize = self
532 .table_stats
533 .values()
534 .map(|s| s.total_value_size as usize)
535 .sum();
536
537 (
538 total_key_size / total_key_count,
539 total_value_size / total_key_count,
540 )
541 }
542 };
543
544 let (min_epoch, max_epoch) = {
545 if self.epoch_set.is_empty() {
546 (HummockEpoch::MAX, u64::MIN)
547 } else {
548 (
549 *self.epoch_set.first().unwrap(),
550 *self.epoch_set.last().unwrap(),
551 )
552 }
553 };
554
555 let sst_info: SstableInfo = SstableInfoInner {
556 object_id: self.sst_object_id,
557 sst_id: self.sst_object_id.inner().into(),
559 bloom_filter_kind,
560 key_range: KeyRange {
561 left: Bytes::from(meta.smallest_key.clone()),
562 right: Bytes::from(meta.largest_key.clone()),
563 right_exclusive,
564 },
565 file_size: meta.estimated_size as u64,
566 table_ids: self.table_ids.into_iter().collect(),
567 meta_offset: meta.meta_offset,
568 stale_key_count,
569 total_key_count,
570 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
571 min_epoch,
572 max_epoch,
573 range_tombstone_count: 0,
574 sst_size: meta.estimated_size as u64,
575 }
576 .into();
577 tracing::trace!(
578 "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
579 meta.encoded_size(),
580 meta.bloom_filter.len(),
581 total_key_count,
582 stale_key_count,
583 min_epoch,
584 max_epoch,
585 self.epoch_set.len()
586 );
587 let bloom_filter_size = meta.bloom_filter.len();
588 let sstable_file_size = sst_info.file_size as usize;
589
590 if !meta.block_metas.is_empty() {
591 let mut last_table_id = meta.block_metas[0].table_id();
593 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
594 for block_meta in &meta.block_metas {
595 let block_table_id = block_meta.table_id();
596 if last_table_id != block_table_id {
597 last_table_id = block_table_id;
598 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
599 }
600
601 last_table_stats.total_compressed_size += block_meta.len as u64;
602 }
603 }
604
605 let writer_output = self.writer.finish(meta).await?;
606 let now = SystemTime::now()
615 .duration_since(SystemTime::UNIX_EPOCH)
616 .expect("Clock may have gone backwards")
617 .as_secs();
618 Ok(SstableBuilderOutput::<W::Output> {
619 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
620 writer_output,
621 stats: SstableBuilderOutputStats {
622 bloom_filter_size,
623 avg_key_size,
624 avg_value_size,
625 epoch_count: self.epoch_set.len(),
626 block_size_vec: self.block_size_vec,
627 sstable_file_size,
628 },
629 })
630 }
631
632 pub fn approximate_len(&self) -> usize {
633 self.writer.data_len()
634 + self.block_builder.approximate_len()
635 + self.filter_builder.approximate_len()
636 }
637
638 pub async fn build_block(&mut self) -> HummockResult<()> {
639 if self.block_builder.is_empty() {
641 return Ok(());
642 }
643
644 let block_meta = self.block_metas.last_mut().unwrap();
645 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
646 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
647 .unwrap_or_else(|_| {
648 panic!(
649 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
650 uncompressed_block_size,
651 self.block_builder.table_id(),
652 )
653 });
654 let block = self.block_builder.build();
655 self.writer.write_block(block, block_meta).await?;
656 self.block_size_vec.push(block.len());
657 self.filter_builder
658 .switch_block(self.memory_limiter.clone());
659 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
660 panic!(
661 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
662 self.writer.data_len(),
663 self.block_builder.table_id(),
664 )
665 });
666 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
667 panic!(
668 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
669 data_len, block_meta.offset
670 )
671 });
672
673 if data_len as usize > self.options.capacity * 2 {
674 tracing::warn!(
675 "WARN unexpected block size {} table {:?}",
676 data_len,
677 self.block_builder.table_id()
678 );
679 }
680
681 self.block_builder.clear();
682 Ok(())
683 }
684
685 pub fn is_empty(&self) -> bool {
686 self.writer.data_len() > 0
687 }
688
689 pub fn reach_capacity(&self) -> bool {
691 self.approximate_len() >= self.options.capacity
692 }
693
694 fn finalize_last_table_stats(&mut self) {
695 if self.table_ids.is_empty() || self.last_table_id.is_none() {
696 return;
697 }
698 self.table_stats.insert(
699 self.last_table_id.unwrap(),
700 std::mem::take(&mut self.last_table_stats),
701 );
702 }
703}
704
705pub struct SstableBuilderOutputStats {
706 bloom_filter_size: usize,
707 avg_key_size: usize,
708 avg_value_size: usize,
709 epoch_count: usize,
710 block_size_vec: Vec<usize>, sstable_file_size: usize,
712}
713
714impl SstableBuilderOutputStats {
715 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
716 if self.bloom_filter_size != 0 {
717 metrics
718 .sstable_bloom_filter_size
719 .observe(self.bloom_filter_size as _);
720 }
721
722 if self.sstable_file_size != 0 {
723 metrics
724 .sstable_file_size
725 .observe(self.sstable_file_size as _);
726 }
727
728 if self.avg_key_size != 0 {
729 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
730 }
731
732 if self.avg_value_size != 0 {
733 metrics
734 .sstable_avg_value_size
735 .observe(self.avg_value_size as _);
736 }
737
738 if self.epoch_count != 0 {
739 metrics
740 .sstable_distinct_epoch_count
741 .observe(self.epoch_count as _);
742 }
743
744 if !self.block_size_vec.is_empty() {
745 for block_size in &self.block_size_vec {
746 metrics.sstable_block_size.observe(*block_size as _);
747 }
748 }
749 }
750}
751
752#[cfg(test)]
753pub(super) mod tests {
754 use std::collections::{Bound, HashMap};
755
756 use risingwave_common::catalog::TableId;
757 use risingwave_common::hash::VirtualNode;
758 use risingwave_common::util::epoch::test_epoch;
759 use risingwave_hummock_sdk::key::UserKey;
760
761 use super::*;
762 use crate::assert_bytes_eq;
763 use crate::compaction_catalog_manager::{
764 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
765 };
766 use crate::hummock::iterator::test_utils::mock_sstable_store;
767 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
768 use crate::hummock::test_utils::{
769 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
770 test_key_of, test_value_of,
771 };
772 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
773 use crate::monitor::StoreLocalStatistic;
774
775 #[tokio::test]
776 async fn test_empty() {
777 let opt = SstableBuilderOptions {
778 capacity: 0,
779 block_capacity: 4096,
780 restart_interval: 16,
781 bloom_false_positive: 0.001,
782 ..Default::default()
783 };
784
785 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
786 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
787 let b = SstableBuilder::for_test(
788 0,
789 mock_sst_writer(&opt),
790 opt,
791 table_id_to_vnode,
792 table_id_to_watermark_serde,
793 );
794
795 b.finish().await.unwrap();
796 }
797
798 #[tokio::test]
799 async fn test_basic() {
800 let opt = default_builder_opt_for_test();
801
802 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
803 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
804 let mut b = SstableBuilder::for_test(
805 0,
806 mock_sst_writer(&opt),
807 opt,
808 table_id_to_vnode,
809 table_id_to_watermark_serde,
810 );
811
812 for i in 0..TEST_KEYS_COUNT {
813 b.add_for_test(
814 test_key_of(i).to_ref(),
815 HummockValue::put(&test_value_of(i)),
816 )
817 .await
818 .unwrap();
819 }
820
821 let output = b.finish().await.unwrap();
822 let info = output.sst_info.sst_info;
823
824 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
825 assert_bytes_eq!(
826 test_key_of(TEST_KEYS_COUNT - 1).encode(),
827 info.key_range.right
828 );
829 let (data, meta) = output.writer_output;
830 assert_eq!(info.file_size, meta.estimated_size as u64);
831 let offset = info.meta_offset as usize;
832 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
833 assert_eq!(meta2, meta);
834 }
835
836 async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
837 let key_count = 1000;
838
839 let opts = SstableBuilderOptions {
840 capacity: 0,
841 block_capacity: 4096,
842 restart_interval: 16,
843 bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
844 ..Default::default()
845 };
846
847 let sstable_store = mock_sstable_store().await;
849 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
850 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
851 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
852 opts,
853 0,
854 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
855 sstable_store.clone(),
856 CachePolicy::NotFill,
857 table_id_to_vnode,
858 table_id_to_watermark_serde,
859 )
860 .await;
861 let table = sstable_store
862 .sstable(&sst_info, &mut StoreLocalStatistic::default())
863 .await
864 .unwrap();
865
866 assert_eq!(table.has_bloom_filter(), with_blooms);
867 for i in 0..key_count {
868 let full_key = test_key_of(i);
869 if table.has_bloom_filter() {
870 let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
871 let key_ref = full_key.user_key.as_ref();
872 assert!(
873 table.may_match_hash(
874 &(Bound::Included(key_ref), Bound::Included(key_ref)),
875 hash
876 ),
877 "failed at {}",
878 i
879 );
880 }
881 }
882 }
883
884 #[tokio::test]
885 async fn test_bloom_filter() {
886 test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
887 test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
888 test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
889 test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
890 }
891
892 #[tokio::test]
893 async fn test_no_bloom_filter_block() {
894 let opts = SstableBuilderOptions::default();
895 let sstable_store = mock_sstable_store().await;
897 let writer_opts = SstableWriterOptions::default();
898 let object_id = 1;
899 let writer = sstable_store
900 .clone()
901 .create_sst_writer(object_id, writer_opts);
902 let mut filter = MultiFilterKeyExtractor::default();
903 filter.register(
904 1.into(),
905 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
906 );
907 filter.register(
908 2.into(),
909 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
910 );
911 filter.register(
912 3.into(),
913 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
914 );
915
916 let table_id_to_vnode = HashMap::from_iter(vec![
917 (1.into(), VirtualNode::COUNT_FOR_TEST),
918 (2.into(), VirtualNode::COUNT_FOR_TEST),
919 (3.into(), VirtualNode::COUNT_FOR_TEST),
920 ]);
921 let table_id_to_watermark_serde =
922 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
923
924 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
925 FilterKeyExtractorImpl::Multi(filter),
926 table_id_to_vnode,
927 table_id_to_watermark_serde,
928 HashMap::default(),
929 ));
930
931 let mut builder = SstableBuilder::new(
932 object_id,
933 writer,
934 BlockedXor16FilterBuilder::new(1024),
935 opts,
936 compaction_catalog_agent_ref,
937 None,
938 );
939
940 let key_count: usize = 10000;
941 for table_id in 1..4 {
942 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
943 for idx in 0..key_count {
944 table_key.resize(VirtualNode::SIZE, 0);
945 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
946 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
947 let v = test_value_of(idx);
948 builder
949 .add(
950 FullKey::from_user_key(k, test_epoch(1)),
951 HummockValue::put(v.as_ref()),
952 )
953 .await
954 .unwrap();
955 }
956 }
957 let ret = builder.finish().await.unwrap();
958 let sst_info = ret.sst_info.sst_info.clone();
959 ret.writer_output.await.unwrap().unwrap();
960 let table = sstable_store
961 .sstable(&sst_info, &mut StoreLocalStatistic::default())
962 .await
963 .unwrap();
964 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
965 for idx in 0..key_count {
966 table_key.resize(VirtualNode::SIZE, 0);
967 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
968 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
969 let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
970 let key_ref = k.as_ref();
971 assert!(
972 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
973 );
974 }
975 }
976}