1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
30
31use super::utils::CompressionAlgorithm;
32use super::{
33 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38 FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{
41 DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions, utils,
42};
43use crate::hummock::value::HummockValue;
44use crate::hummock::{
45 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
46 try_shorten_block_smallest_key,
47};
48use crate::monitor::CompactorMetrics;
49use crate::opts::StorageOpts;
50
51pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
52pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
53pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
54pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
55
56#[derive(Clone, Debug)]
57pub struct SstableBuilderOptions {
58 pub capacity: usize,
60 pub block_capacity: usize,
62 pub restart_interval: usize,
64 pub bloom_false_positive: f64,
66 pub compression_algorithm: CompressionAlgorithm,
68 pub max_sst_size: u64,
69 pub shorten_block_meta_key_threshold: Option<usize>,
71 pub max_vnode_key_range_bytes: Option<usize>,
73 pub estimated_output_key_count: Option<usize>,
75 pub filter_hash_prealloc_key_count_cap: usize,
77}
78
79impl From<&StorageOpts> for SstableBuilderOptions {
80 fn from(options: &StorageOpts) -> SstableBuilderOptions {
81 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
82 SstableBuilderOptions {
83 capacity,
84 block_capacity: (options.block_size_kb as usize) * (1 << 10),
85 restart_interval: DEFAULT_RESTART_INTERVAL,
86 bloom_false_positive: options.bloom_false_positive,
87 compression_algorithm: CompressionAlgorithm::None,
88 max_sst_size: options.compactor_max_sst_size,
89 shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
90 max_vnode_key_range_bytes: None,
91 estimated_output_key_count: None,
92 filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
93 }
94 }
95}
96
97impl Default for SstableBuilderOptions {
98 fn default() -> Self {
99 Self {
100 capacity: DEFAULT_SSTABLE_SIZE,
101 block_capacity: DEFAULT_BLOCK_SIZE,
102 restart_interval: DEFAULT_RESTART_INTERVAL,
103 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
104 compression_algorithm: CompressionAlgorithm::None,
105 max_sst_size: DEFAULT_MAX_SST_SIZE,
106 shorten_block_meta_key_threshold: None,
107 max_vnode_key_range_bytes: None,
108 estimated_output_key_count: None,
109 filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
110 }
111 }
112}
113
114impl SstableBuilderOptions {
115 pub fn estimated_output_key_count(&self) -> usize {
116 self.estimated_output_key_count
117 .unwrap_or(self.capacity / DEFAULT_ENTRY_SIZE + 1)
118 }
119
120 pub fn filter_builder_options(&self) -> FilterBuilderOptions {
121 FilterBuilderOptions {
122 estimated_key_count: self.estimated_output_key_count(),
123 estimated_block_count: self.capacity / self.block_capacity + 1,
124 hash_prealloc_key_count_cap: self.filter_hash_prealloc_key_count_cap,
125 }
126 }
127}
128
129pub struct SstableBuilderOutput<WO> {
130 pub sst_info: LocalSstableInfo,
131 pub writer_output: WO,
132 pub stats: SstableBuilderOutputStats,
133}
134
135pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
136 options: SstableBuilderOptions,
138 writer: W,
140 block_builder: BlockBuilder,
142
143 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
144 block_metas: Vec<BlockMeta>,
146
147 table_ids: BTreeSet<TableId>,
149 last_full_key: Vec<u8>,
150 raw_key: BytesMut,
152 raw_value: BytesMut,
153 last_table_id: Option<TableId>,
154 sst_object_id: HummockSstableObjectId,
155
156 table_stats: TableStatsMap,
158 last_table_stats: TableStats,
161
162 filter_builder: F,
163
164 epoch_set: BTreeSet<u64>,
165 memory_limiter: Option<Arc<MemoryLimiter>>,
166
167 block_size_vec: Vec<usize>, vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
169}
170
171impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
172 pub fn for_test(
173 sstable_id: u64,
174 writer: W,
175 options: SstableBuilderOptions,
176 table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
177 table_id_to_watermark_serde: HashMap<
178 impl Into<TableId>,
179 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
180 >,
181 ) -> Self {
182 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
183 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
184 table_id_to_vnode
185 .into_iter()
186 .map(|(table_id, v)| (table_id.into(), v))
187 .collect(),
188 table_id_to_watermark_serde
189 .into_iter()
190 .map(|(table_id, v)| (table_id.into(), v))
191 .collect(),
192 HashMap::default(),
193 ));
194
195 Self::new(
196 sstable_id,
197 writer,
198 Xor16FilterBuilder::create(options.filter_builder_options()),
199 options,
200 compaction_catalog_agent_ref,
201 None,
202 )
203 }
204}
205
206impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
207 pub fn new(
208 sst_object_id: impl Into<HummockSstableObjectId>,
209 writer: W,
210 filter_builder: F,
211 options: SstableBuilderOptions,
212 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
213 memory_limiter: Option<Arc<MemoryLimiter>>,
214 ) -> Self {
215 let sst_object_id = sst_object_id.into();
216 Self {
217 vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
218 options.max_vnode_key_range_bytes,
219 ),
220 options: options.clone(),
221 writer,
222 block_builder: BlockBuilder::new(BlockBuilderOptions {
223 capacity: options.block_capacity,
224 restart_interval: options.restart_interval,
225 compression_algorithm: options.compression_algorithm,
226 }),
227 filter_builder,
228 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
229 table_ids: BTreeSet::new(),
230 last_table_id: None,
231 raw_key: BytesMut::new(),
232 raw_value: BytesMut::new(),
233 last_full_key: vec![],
234 sst_object_id,
235 compaction_catalog_agent_ref,
236 table_stats: Default::default(),
237 last_table_stats: Default::default(),
238 epoch_set: BTreeSet::default(),
239 memory_limiter,
240 block_size_vec: Vec::new(),
241 }
242 }
243
244 pub async fn add_for_test(
246 &mut self,
247 full_key: FullKey<&[u8]>,
248 value: HummockValue<&[u8]>,
249 ) -> HummockResult<()> {
250 self.add(full_key, value).await
251 }
252
253 pub fn current_block_size(&self) -> usize {
254 self.block_builder.approximate_len()
255 }
256
257 pub async fn add_raw_block(
259 &mut self,
260 buf: Bytes,
261 filter_data: Vec<u8>,
262 smallest_key: FullKey<Vec<u8>>,
263 largest_key: Vec<u8>,
264 mut meta: BlockMeta,
265 ) -> HummockResult<bool> {
266 let table_id = smallest_key.user_key.table_id;
267 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
268 if !self.block_builder.is_empty() {
269 self.build_block().await?;
271 }
272
273 self.table_ids.insert(table_id);
274 self.finalize_last_table_stats();
275 self.last_table_id = Some(table_id);
276 }
277
278 if !self.block_builder.is_empty() {
279 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
280
281 if self.block_builder.approximate_len() < min_block_size {
283 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
284 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
285 iter.seek_to_first();
286 while iter.is_valid() {
287 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
288 panic!(
289 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
290 self.sst_object_id, self.block_metas.len(), self.last_table_id
291 )
292 });
293 self.add_impl(iter.key(), value, false).await?;
294 iter.next();
295 }
296 return Ok(false);
297 }
298
299 self.build_block().await?;
300 }
301 self.last_full_key = largest_key;
302 assert_eq!(
303 meta.len as usize,
304 buf.len(),
305 "meta {} buf {} last_table_id {:?}",
306 meta.len,
307 buf.len(),
308 self.last_table_id
309 );
310 meta.offset = self.writer.data_len() as u32;
311 self.block_metas.push(meta);
312 self.filter_builder.add_raw_data(filter_data);
313 let block_meta = self.block_metas.last_mut().unwrap();
314 self.writer.write_block_bytes(buf, block_meta).await?;
315
316 Ok(true)
317 }
318
319 pub async fn add(
321 &mut self,
322 full_key: FullKey<&[u8]>,
323 value: HummockValue<&[u8]>,
324 ) -> HummockResult<()> {
325 self.add_impl(full_key, value, true).await
326 }
327
328 async fn add_impl(
330 &mut self,
331 full_key: FullKey<&[u8]>,
332 value: HummockValue<&[u8]>,
333 could_switch_block: bool,
334 ) -> HummockResult<()> {
335 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
336
337 let table_key_len = full_key.user_key.table_key.as_ref().len();
338 let table_value_len = match &value {
339 HummockValue::Put(t) => t.len(),
340 HummockValue::Delete => 0,
341 };
342 let large_value_len = self.options.max_sst_size as usize / 10;
343 let large_key_value_len = self.options.max_sst_size as usize / 2;
344 if table_key_len >= LARGE_KEY_LEN
345 || table_value_len > large_value_len
346 || table_key_len + table_value_len > large_key_value_len
347 {
348 let table_id = full_key.user_key.table_id;
349 tracing::warn!(
350 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
351 table_id,
352 table_key_len,
353 table_value_len,
354 full_key.epoch_with_gap.pure_epoch(),
355 full_key.epoch_with_gap.offset(),
356 );
357 }
358
359 full_key.encode_into(&mut self.raw_key);
361 value.encode(&mut self.raw_value);
362 let is_new_user_key = self.last_full_key.is_empty()
363 || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
364 let table_id = full_key.user_key.table_id;
365 let is_new_table = self.last_table_id != Some(table_id);
366 let current_block_size = self.current_block_size();
367 let is_block_full = current_block_size >= self.options.block_capacity
368 || (current_block_size > self.options.block_capacity / 4 * 3
369 && current_block_size + self.raw_value.len() + self.raw_key.len()
370 > self.options.block_capacity);
371
372 if is_new_table {
373 assert!(
374 could_switch_block,
375 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
376 is_new_user_key,
377 self.sst_object_id,
378 self.block_metas.len(),
379 table_id,
380 self.last_table_id,
381 full_key
382 );
383 self.table_ids.insert(table_id);
384 self.finalize_last_table_stats();
385 self.last_table_id = Some(table_id);
386 if !self.block_builder.is_empty() {
387 self.build_block().await?;
388 }
389 } else if is_block_full && could_switch_block {
390 self.build_block().await?;
391 }
392 self.last_table_stats.total_key_count += 1;
393 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
394
395 if self.block_builder.is_empty() {
397 let smallest_key = if let Some(threshold) =
398 self.options.shorten_block_meta_key_threshold
399 && !self.last_full_key.is_empty()
400 && full_key.encoded_len() >= threshold
401 {
402 let prev = FullKey::decode(&self.last_full_key);
403 if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
404 shortened.encode()
405 } else {
406 full_key.encode()
407 }
408 } else {
409 full_key.encode()
410 };
411
412 self.block_metas.push(BlockMeta {
413 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
414 panic!(
415 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
416 self.writer.data_len(),
417 self.sst_object_id,
418 self.block_metas.len(),
419 self.table_ids,
420 )
421 }),
422 len: 0,
423 smallest_key,
424 uncompressed_size: 0,
425 total_key_count: 0,
426 stale_key_count: 0,
427 });
428 }
429
430 let filter_key = self
431 .compaction_catalog_agent_ref
432 .extract(user_key(&self.raw_key));
433 if !filter_key.is_empty() {
435 self.filter_builder
436 .add_key(filter_key, table_id.as_raw_id());
437 }
438 self.block_builder.add(
440 table_id,
441 &self.raw_key[TABLE_PREFIX_LEN..],
442 self.raw_value.as_ref(),
443 );
444 self.block_metas.last_mut().unwrap().total_key_count += 1;
445 if !is_new_user_key || value.is_delete() {
446 self.block_metas.last_mut().unwrap().stale_key_count += 1;
447 }
448 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
449 self.last_table_stats.total_value_size += value.encoded_len() as i64;
450
451 if let Some(collector) = self.vnode_range_collector.as_mut() {
452 collector.observe_key(
453 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
454 &self.raw_key,
455 &self.last_full_key,
456 );
457 }
458
459 self.last_full_key.clear();
460 self.last_full_key.extend_from_slice(&self.raw_key);
461
462 self.raw_key.clear();
463 self.raw_value.clear();
464 Ok(())
465 }
466
467 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
477 let smallest_key = if self.block_metas.is_empty() {
478 vec![]
479 } else {
480 self.block_metas[0].smallest_key.clone()
481 };
482 let largest_key = self.last_full_key.clone();
483 self.finalize_last_table_stats();
484
485 assert!(
488 self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
489 "vnode key-range hints are only supported for single-table SSTs, found {} tables",
490 self.table_ids.len()
491 );
492
493 self.build_block().await?;
494 let right_exclusive = false;
495 let meta_offset = self.writer.data_len() as u64;
496
497 let filter_data = self.filter_builder.finish(self.memory_limiter.clone());
498 let (bloom_filter_kind, filter_type) = if filter_data.is_empty() {
499 (
500 BloomFilterType::BloomFilterUnspecified,
501 PbSstableFilterType::SstableFilterNone,
502 )
503 } else if self.filter_builder.support_blocked_raw_data() {
504 (BloomFilterType::Blocked, self.filter_builder.filter_type())
505 } else {
506 (BloomFilterType::Sstable, self.filter_builder.filter_type())
507 };
508
509 let total_key_count = self
510 .block_metas
511 .iter()
512 .map(|block_meta| block_meta.total_key_count as u64)
513 .sum::<u64>();
514 let stale_key_count = self
515 .block_metas
516 .iter()
517 .map(|block_meta| block_meta.stale_key_count as u64)
518 .sum::<u64>();
519 let uncompressed_file_size = self
520 .block_metas
521 .iter()
522 .map(|block_meta| block_meta.uncompressed_size as u64)
523 .sum::<u64>();
524
525 #[expect(deprecated)]
526 let mut meta = SstableMeta {
527 block_metas: self.block_metas,
528 bloom_filter: filter_data,
529 estimated_size: 0,
530 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
531 panic!(
532 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
533 total_key_count, self.table_ids,
534 )
535 }),
536 smallest_key,
537 largest_key,
538 version: VERSION,
539 meta_offset,
540 monotonic_tombstone_events: vec![],
541 };
542
543 let meta_encode_size = meta.encoded_size();
544 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
545 panic!(
546 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
547 meta_encode_size, self.table_ids,
548 )
549 });
550 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
551 panic!(
552 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
553 meta_offset, self.table_ids,
554 )
555 });
556 meta.estimated_size = encoded_size_u32
557 .checked_add(meta_offset_u32)
558 .unwrap_or_else(|| {
559 panic!(
560 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
561 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
562 )
563 });
564
565 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
566 (0, 0)
567 } else {
568 let total_key_count: usize = self
569 .table_stats
570 .values()
571 .map(|s| s.total_key_count as usize)
572 .sum();
573
574 if total_key_count == 0 {
575 (0, 0)
576 } else {
577 let total_key_size: usize = self
578 .table_stats
579 .values()
580 .map(|s| s.total_key_size as usize)
581 .sum();
582
583 let total_value_size: usize = self
584 .table_stats
585 .values()
586 .map(|s| s.total_value_size as usize)
587 .sum();
588
589 (
590 total_key_size / total_key_count,
591 total_value_size / total_key_count,
592 )
593 }
594 };
595
596 let (min_epoch, max_epoch) = {
597 if self.epoch_set.is_empty() {
598 (HummockEpoch::MAX, u64::MIN)
599 } else {
600 (
601 *self.epoch_set.first().unwrap(),
602 *self.epoch_set.last().unwrap(),
603 )
604 }
605 };
606
607 let vnode_user_key_ranges = self
608 .vnode_range_collector
609 .take()
610 .and_then(|collector| collector.finish(&self.last_full_key));
611
612 let sst_info: SstableInfo = SstableInfoInner {
613 object_id: self.sst_object_id,
614 sst_id: self.sst_object_id.as_raw_id().into(),
616 bloom_filter_kind,
617 key_range: KeyRange {
618 left: Bytes::from(meta.smallest_key.clone()),
619 right: Bytes::from(meta.largest_key.clone()),
620 right_exclusive,
621 },
622 file_size: meta.estimated_size as u64,
623 table_ids: self.table_ids.into_iter().collect(),
624 meta_offset: meta.meta_offset,
625 stale_key_count,
626 total_key_count,
627 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
628 min_epoch,
629 max_epoch,
630 range_tombstone_count: 0,
631 sst_size: meta.estimated_size as u64,
632 filter_type,
633 vnode_statistics: vnode_user_key_ranges,
634 }
635 .into();
636
637 tracing::trace!(
638 "meta_size {} filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
639 meta.encoded_size(),
640 meta.bloom_filter.len(),
641 total_key_count,
642 stale_key_count,
643 min_epoch,
644 max_epoch,
645 self.epoch_set.len()
646 );
647 let filter_size = meta.bloom_filter.len();
648 let sstable_file_size = sst_info.file_size as usize;
649
650 if !meta.block_metas.is_empty() {
651 let mut last_table_id = meta.block_metas[0].table_id();
653 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
654 for block_meta in &meta.block_metas {
655 let block_table_id = block_meta.table_id();
656 if last_table_id != block_table_id {
657 last_table_id = block_table_id;
658 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
659 }
660
661 last_table_stats.total_compressed_size += block_meta.len as u64;
662 }
663 }
664
665 let writer_output = self.writer.finish(meta).await?;
666 let now = SystemTime::now()
675 .duration_since(SystemTime::UNIX_EPOCH)
676 .expect("Clock may have gone backwards")
677 .as_secs();
678 Ok(SstableBuilderOutput::<W::Output> {
679 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
680 writer_output,
681 stats: SstableBuilderOutputStats {
682 filter_size,
683 avg_key_size,
684 avg_value_size,
685 epoch_count: self.epoch_set.len(),
686 block_size_vec: self.block_size_vec,
687 sstable_file_size,
688 },
689 })
690 }
691
692 pub fn approximate_len(&self) -> usize {
693 self.writer.data_len()
694 + self.block_builder.approximate_len()
695 + self.filter_builder.approximate_len()
696 }
697
698 pub async fn build_block(&mut self) -> HummockResult<()> {
699 if self.block_builder.is_empty() {
701 return Ok(());
702 }
703
704 let block_meta = self.block_metas.last_mut().unwrap();
705 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
706 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
707 .unwrap_or_else(|_| {
708 panic!(
709 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
710 uncompressed_block_size,
711 self.block_builder.table_id(),
712 )
713 });
714 let block = self.block_builder.build();
715 self.writer.write_block(block, block_meta).await?;
716 self.block_size_vec.push(block.len());
717 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
718 panic!(
719 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
720 self.writer.data_len(),
721 self.block_builder.table_id(),
722 )
723 });
724 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
725 panic!(
726 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
727 data_len, block_meta.offset
728 )
729 });
730
731 self.filter_builder
732 .switch_block(self.memory_limiter.clone());
733
734 if data_len as usize > self.options.capacity * 2 {
735 tracing::warn!(
736 "WARN unexpected block size {} table {:?}",
737 data_len,
738 self.block_builder.table_id()
739 );
740 }
741
742 self.block_builder.clear();
743 Ok(())
744 }
745
746 pub fn is_empty(&self) -> bool {
747 self.writer.data_len() > 0
748 }
749
750 pub fn reach_capacity(&self) -> bool {
752 self.approximate_len() >= self.options.capacity
753 }
754
755 fn finalize_last_table_stats(&mut self) {
756 if self.table_ids.is_empty() || self.last_table_id.is_none() {
757 return;
758 }
759 self.table_stats.insert(
760 self.last_table_id.unwrap(),
761 std::mem::take(&mut self.last_table_stats),
762 );
763 }
764}
765
766struct VnodeUserKeyRangeCollector {
768 max_bytes: usize,
769 current_size: usize,
770 ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
771 current_vnode: VirtualNode,
772 range_start_key: Vec<u8>,
773}
774
775impl VnodeUserKeyRangeCollector {
776 fn new(max_bytes: usize) -> Self {
777 Self {
778 max_bytes,
779 current_size: 0,
780 ranges: BTreeMap::new(),
781 current_vnode: VirtualNode::ZERO,
782 range_start_key: Vec::new(),
783 }
784 }
785
786 fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
787 max_bytes.filter(|&n| n > 0).map(Self::new)
788 }
789
790 fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
793 if self.current_size >= self.max_bytes {
794 return;
795 }
796
797 if self.range_start_key.is_empty() {
799 self.current_vnode = vnode;
800 self.range_start_key = key.to_vec();
801 return;
802 }
803
804 if vnode == self.current_vnode {
806 return;
807 }
808
809 self.seal_range(prev_key);
811
812 if self.current_size >= self.max_bytes {
814 return;
815 }
816
817 self.current_vnode = vnode;
819 self.range_start_key = key.to_vec();
820 }
821
822 fn seal_range(&mut self, right_key: &[u8]) {
824 let left_key = mem::take(&mut self.range_start_key);
825 self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
826
827 let left_full_key = FullKey::decode(&left_key);
828 let right_full_key = FullKey::decode(right_key);
829 let left_user_key = left_full_key.user_key.copy_into();
830 let right_user_key = right_full_key.user_key.copy_into();
831
832 assert_eq!(
837 left_user_key.get_vnode_id(),
838 right_user_key.get_vnode_id(),
839 "vnode changed within range: left_user {:?}, right_user {:?}",
840 left_user_key,
841 right_user_key
842 );
843 assert_eq!(
844 left_user_key.get_vnode_id(),
845 self.current_vnode.to_index(),
846 "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
847 left_user_key,
848 right_user_key,
849 self.current_vnode.to_index()
850 );
851 assert_eq!(
852 left_user_key.table_id, right_user_key.table_id,
853 "table_id changed within range: left {:?}, right {:?}",
854 left_user_key, right_user_key
855 );
856
857 self.ranges
858 .insert(self.current_vnode, (left_user_key, right_user_key));
859 }
860
861 fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
863 if !self.range_start_key.is_empty() {
864 self.seal_range(last_key);
865 }
866
867 if self.ranges.len() > 1 {
868 let mut table_ids = self
870 .ranges
871 .values()
872 .flat_map(|(left, right)| [left.table_id, right.table_id]);
873 if let Some(first_table_id) = table_ids.next() {
874 for table_id in table_ids {
875 assert_eq!(
876 table_id, first_table_id,
877 "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
878 table_id, first_table_id
879 );
880 }
881 }
882
883 Some(VnodeStatistics::from_map(self.ranges))
884 } else {
885 None
886 }
887 }
888}
889
890pub struct SstableBuilderOutputStats {
891 filter_size: usize,
892 avg_key_size: usize,
893 avg_value_size: usize,
894 epoch_count: usize,
895 block_size_vec: Vec<usize>, sstable_file_size: usize,
897}
898
899impl SstableBuilderOutputStats {
900 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
901 if self.filter_size != 0 {
902 metrics
903 .sstable_bloom_filter_size
904 .observe(self.filter_size as _);
905 }
906
907 if self.sstable_file_size != 0 {
908 metrics
909 .sstable_file_size
910 .observe(self.sstable_file_size as _);
911 }
912
913 if self.avg_key_size != 0 {
914 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
915 }
916
917 if self.avg_value_size != 0 {
918 metrics
919 .sstable_avg_value_size
920 .observe(self.avg_value_size as _);
921 }
922
923 if self.epoch_count != 0 {
924 metrics
925 .sstable_distinct_epoch_count
926 .observe(self.epoch_count as _);
927 }
928
929 if !self.block_size_vec.is_empty() {
930 for block_size in &self.block_size_vec {
931 metrics.sstable_block_size.observe(*block_size as _);
932 }
933 }
934 }
935}
936
937#[cfg(test)]
938pub(super) mod tests {
939 use std::collections::{Bound, HashMap};
940
941 use risingwave_common::catalog::TableId;
942 use risingwave_common::hash::VirtualNode;
943 use risingwave_common::util::epoch::test_epoch;
944 use risingwave_hummock_sdk::key::UserKey;
945
946 use super::*;
947 use crate::assert_bytes_eq;
948 use crate::compaction_catalog_manager::{
949 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
950 };
951 use crate::hummock::iterator::test_utils::mock_sstable_store;
952 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
953 use crate::hummock::test_utils::{
954 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
955 test_key_of, test_value_of,
956 };
957 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
958 use crate::monitor::StoreLocalStatistic;
959
960 #[tokio::test]
961 async fn test_empty() {
962 let opt = SstableBuilderOptions {
963 capacity: 0,
964 block_capacity: 4096,
965 restart_interval: 16,
966 bloom_false_positive: 0.001,
967 ..Default::default()
968 };
969
970 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
971 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
972 let b = SstableBuilder::for_test(
973 0,
974 mock_sst_writer(&opt),
975 opt,
976 table_id_to_vnode,
977 table_id_to_watermark_serde,
978 );
979
980 b.finish().await.unwrap();
981 }
982
983 fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
984 let mut table_key = vnode.to_be_bytes().to_vec();
985 table_key.extend_from_slice(table_key_suffix);
986 FullKey::for_test(TableId::default(), table_key, 0).encode()
987 }
988
989 fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
990 let mut key = vnode.to_be_bytes().to_vec();
991 key.extend_from_slice(suffix);
992 key
993 }
994
995 #[test]
996 fn test_vnode_user_key_range_basic_collection() {
997 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1000 let vnode_1 = VirtualNode::from_index(1);
1001 let vnode_2 = VirtualNode::from_index(2);
1002
1003 let k1 = encode_full_key(vnode_1, b"k1");
1004 let k2 = encode_full_key(vnode_1, b"k2");
1005 let k3 = encode_full_key(vnode_2, b"k3");
1006 let k4 = encode_full_key(vnode_2, b"k4");
1007
1008 collector.observe_key(vnode_1, &k1, &[]);
1009 collector.observe_key(vnode_1, &k2, &k1);
1010 collector.observe_key(vnode_2, &k3, &k2);
1011 collector.observe_key(vnode_2, &k4, &k3);
1012
1013 let info = collector.finish(&k4).unwrap();
1014 assert_eq!(info.vnode_user_key_ranges().len(), 2);
1015
1016 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1018 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1019 assert_eq!(
1020 range1_right.table_key.as_ref(),
1021 table_key_of(vnode_1, b"k2")
1022 );
1023
1024 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1026 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1027 assert_eq!(
1028 range2_right.table_key.as_ref(),
1029 table_key_of(vnode_2, b"k4")
1030 );
1031 }
1032
1033 #[test]
1034 fn test_vnode_user_key_range_capacity_limit() {
1035 let vnode_1 = VirtualNode::from_index(1);
1045 let vnode_2 = VirtualNode::from_index(2);
1046 let vnode_3 = VirtualNode::from_index(3);
1047 let vnode_4 = VirtualNode::from_index(4);
1048
1049 let k1 = encode_full_key(vnode_1, b"k1");
1050 let k2 = encode_full_key(vnode_1, b"k2");
1051 let k3 = encode_full_key(vnode_2, b"k3");
1052 let k4 = encode_full_key(vnode_2, b"k4");
1053 let k5 = encode_full_key(vnode_3, b"k5");
1054 let k6 = encode_full_key(vnode_3, b"k6");
1055 let k7 = encode_full_key(vnode_4, b"k7");
1056
1057 let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1058 let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1059 let limit = range_size + estimated_next_size; let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1061
1062 collector.observe_key(vnode_1, &k1, &[]);
1063 collector.observe_key(vnode_1, &k2, &k1);
1064 collector.observe_key(vnode_2, &k3, &k2); collector.observe_key(vnode_2, &k4, &k3);
1066 collector.observe_key(vnode_3, &k5, &k4); collector.observe_key(vnode_3, &k6, &k5); collector.observe_key(vnode_4, &k7, &k6); let info = collector.finish(&k7).unwrap();
1071 assert_eq!(
1072 info.vnode_user_key_ranges().len(),
1073 2,
1074 "Should collect exactly 2 vnodes"
1075 );
1076
1077 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1079 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1080 assert_eq!(
1081 range1_right.table_key.as_ref(),
1082 table_key_of(vnode_1, b"k2")
1083 );
1084
1085 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1086 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1087 assert_eq!(
1088 range2_right.table_key.as_ref(),
1089 table_key_of(vnode_2, b"k4")
1090 );
1091
1092 assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1094 assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1095 }
1096
1097 #[test]
1098 fn test_vnode_user_key_range_sparse_distribution() {
1099 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1102 let vnode_5 = VirtualNode::from_index(5);
1103 let vnode_10 = VirtualNode::from_index(10);
1104 let vnode_100 = VirtualNode::from_index(100);
1105
1106 let keys = vec![
1107 (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1108 (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1109 (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1110 (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1111 (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1112 (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1113 (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1114 ];
1115
1116 let mut prev_key = Vec::new();
1117 for (vnode, key) in &keys {
1118 collector.observe_key(*vnode, key, &prev_key);
1119 prev_key = key.clone();
1120 }
1121
1122 let info = collector.finish(&prev_key).unwrap();
1123 assert_eq!(info.vnode_user_key_ranges().len(), 3);
1124
1125 let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1127 assert_eq!(
1128 range5_left.table_key.as_ref(),
1129 table_key_of(vnode_5, b"key_005_001")
1130 );
1131 assert_eq!(
1132 range5_right.table_key.as_ref(),
1133 table_key_of(vnode_5, b"key_005_999")
1134 );
1135
1136 let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1137 assert_eq!(
1138 range10_left.table_key.as_ref(),
1139 table_key_of(vnode_10, b"key_010_001")
1140 );
1141 assert_eq!(
1142 range10_right.table_key.as_ref(),
1143 table_key_of(vnode_10, b"key_010_100")
1144 );
1145
1146 let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1147 assert_eq!(
1148 range100_left.table_key.as_ref(),
1149 table_key_of(vnode_100, b"key_100_001")
1150 );
1151 assert_eq!(
1152 range100_right.table_key.as_ref(),
1153 table_key_of(vnode_100, b"key_100_999")
1154 );
1155
1156 assert!(
1158 info.get_vnode_user_key_range(VirtualNode::from_index(0))
1159 .is_none()
1160 );
1161 assert!(
1162 info.get_vnode_user_key_range(VirtualNode::from_index(7))
1163 .is_none()
1164 );
1165 assert!(
1166 info.get_vnode_user_key_range(VirtualNode::from_index(50))
1167 .is_none()
1168 );
1169 }
1170
1171 #[test]
1172 fn test_vnode_user_key_range_edge_cases() {
1173 assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1175 assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1176
1177 let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1179 assert!(
1180 collector
1181 .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1182 .is_none()
1183 );
1184
1185 let vnode = VirtualNode::from_index(5);
1187 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1188 let a = encode_full_key(vnode, b"a");
1189 let b = encode_full_key(vnode, b"b");
1190 let c = encode_full_key(vnode, b"c");
1191 collector.observe_key(vnode, &a, &[]);
1192 collector.observe_key(vnode, &b, &a);
1193 collector.observe_key(vnode, &c, &b);
1194 assert!(
1195 collector.finish(&c).is_none(),
1196 "Single-vnode SST should not emit hints"
1197 );
1198
1199 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1204 let vnode_1 = VirtualNode::from_index(1);
1205 let vnode_2 = VirtualNode::from_index(2);
1206 let key1 = encode_full_key(vnode_1, b"key1");
1207 let key2 = encode_full_key(vnode_1, b"key2");
1208 let key3 = encode_full_key(vnode_2, b"key3");
1209 collector.observe_key(vnode_1, &key1, &[]);
1210 collector.observe_key(vnode_1, &key2, &key1);
1211 collector.observe_key(vnode_2, &key3, &key2); assert!(
1213 collector.finish(&key3).is_none(),
1214 "Only 1 vnode collected, should return None"
1215 );
1216 }
1217
1218 #[tokio::test]
1219 async fn test_basic() {
1220 let opt = default_builder_opt_for_test();
1221
1222 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1223 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1224 let mut b = SstableBuilder::for_test(
1225 0,
1226 mock_sst_writer(&opt),
1227 opt,
1228 table_id_to_vnode,
1229 table_id_to_watermark_serde,
1230 );
1231
1232 for i in 0..TEST_KEYS_COUNT {
1233 b.add_for_test(
1234 test_key_of(i).to_ref(),
1235 HummockValue::put(&test_value_of(i)),
1236 )
1237 .await
1238 .unwrap();
1239 }
1240
1241 let output = b.finish().await.unwrap();
1242 let info = output.sst_info.sst_info;
1243
1244 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1245 assert_bytes_eq!(
1246 test_key_of(TEST_KEYS_COUNT - 1).encode(),
1247 info.key_range.right
1248 );
1249 let (data, meta) = output.writer_output;
1250 assert_eq!(info.file_size, meta.estimated_size as u64);
1251 let offset = info.meta_offset as usize;
1252 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1253 assert_eq!(meta2, meta);
1254 }
1255
1256 async fn test_with_xor_filter_builder<F: FilterBuilder>(
1257 bloom_false_positive: f64,
1258 expected_filter_type: PbSstableFilterType,
1259 ) {
1260 let key_count = 1000;
1261
1262 let opts = SstableBuilderOptions {
1263 capacity: 0,
1264 block_capacity: 4096,
1265 restart_interval: 16,
1266 bloom_false_positive,
1267 ..Default::default()
1268 };
1269
1270 let sstable_store = mock_sstable_store().await;
1272 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1273 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1274 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1275 opts,
1276 0,
1277 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1278 sstable_store.clone(),
1279 CachePolicy::NotFill,
1280 table_id_to_vnode,
1281 table_id_to_watermark_serde,
1282 )
1283 .await;
1284 assert_eq!(sst_info.filter_type, expected_filter_type);
1285 let table = sstable_store
1286 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1287 .await
1288 .unwrap();
1289
1290 assert!(table.has_filter());
1291 for i in 0..key_count {
1292 let full_key = test_key_of(i);
1293 let hash = Sstable::hash_for_filter(full_key.user_key.encode().as_slice(), 0);
1294 let key_ref = full_key.user_key.as_ref();
1295 assert!(
1296 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash),
1297 "failed at {}",
1298 i
1299 );
1300 }
1301 }
1302
1303 #[tokio::test]
1304 async fn test_xor_filter_builder_output() {
1305 test_with_xor_filter_builder::<Xor16FilterBuilder>(
1306 0.0,
1307 PbSstableFilterType::SstableFilterXor16,
1308 )
1309 .await;
1310 test_with_xor_filter_builder::<Xor16FilterBuilder>(
1311 0.01,
1312 PbSstableFilterType::SstableFilterXor16,
1313 )
1314 .await;
1315 test_with_xor_filter_builder::<Xor8FilterBuilder>(
1316 0.01,
1317 PbSstableFilterType::SstableFilterXor8,
1318 )
1319 .await;
1320 test_with_xor_filter_builder::<BlockedXor16FilterBuilder>(
1321 0.01,
1322 PbSstableFilterType::SstableFilterXor16,
1323 )
1324 .await;
1325 }
1326
1327 #[tokio::test]
1328 async fn test_no_xor_filter_block() {
1329 let opts = SstableBuilderOptions::default();
1330 let sstable_store = mock_sstable_store().await;
1332 let writer_opts = SstableWriterOptions::default();
1333 let object_id = 1;
1334 let writer = sstable_store
1335 .clone()
1336 .create_sst_writer(object_id, writer_opts);
1337 let mut filter = MultiFilterKeyExtractor::default();
1338 filter.register(
1339 1.into(),
1340 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1341 );
1342 filter.register(
1343 2.into(),
1344 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1345 );
1346 filter.register(
1347 3.into(),
1348 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1349 );
1350
1351 let table_id_to_vnode = HashMap::from_iter(vec![
1352 (1.into(), VirtualNode::COUNT_FOR_TEST),
1353 (2.into(), VirtualNode::COUNT_FOR_TEST),
1354 (3.into(), VirtualNode::COUNT_FOR_TEST),
1355 ]);
1356 let table_id_to_watermark_serde =
1357 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1358
1359 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1360 FilterKeyExtractorImpl::Multi(filter),
1361 table_id_to_vnode,
1362 table_id_to_watermark_serde,
1363 HashMap::default(),
1364 ));
1365
1366 let mut builder = SstableBuilder::new(
1367 object_id,
1368 writer,
1369 BlockedXor16FilterBuilder::create(opts.filter_builder_options()),
1370 opts,
1371 compaction_catalog_agent_ref,
1372 None,
1373 );
1374
1375 let key_count: usize = 10000;
1376 for table_id in 1..4 {
1377 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1378 for idx in 0..key_count {
1379 table_key.resize(VirtualNode::SIZE, 0);
1380 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1381 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1382 let v = test_value_of(idx);
1383 builder
1384 .add(
1385 FullKey::from_user_key(k, test_epoch(1)),
1386 HummockValue::put(v.as_ref()),
1387 )
1388 .await
1389 .unwrap();
1390 }
1391 }
1392 let ret = builder.finish().await.unwrap();
1393 let sst_info = ret.sst_info.sst_info.clone();
1394 ret.writer_output.await.unwrap().unwrap();
1395 let table = sstable_store
1396 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1397 .await
1398 .unwrap();
1399 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1400 for idx in 0..key_count {
1401 table_key.resize(VirtualNode::SIZE, 0);
1402 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1403 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1404 let hash = Sstable::hash_for_filter(&k.encode(), 2);
1405 let key_ref = k.as_ref();
1406 assert!(
1407 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1408 );
1409 }
1410 }
1411}