1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::BloomFilterType;
30
31use super::utils::CompressionAlgorithm;
32use super::{
33 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38 FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{FilterBuilder, utils};
41use crate::hummock::value::HummockValue;
42use crate::hummock::{
43 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
44 try_shorten_block_smallest_key,
45};
46use crate::monitor::CompactorMetrics;
47use crate::opts::StorageOpts;
48
49pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
50pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
51pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
52pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
53
54#[derive(Clone, Debug)]
55pub struct SstableBuilderOptions {
56 pub capacity: usize,
58 pub block_capacity: usize,
60 pub restart_interval: usize,
62 pub bloom_false_positive: f64,
64 pub compression_algorithm: CompressionAlgorithm,
66 pub max_sst_size: u64,
67 pub shorten_block_meta_key_threshold: Option<usize>,
69 pub max_vnode_key_range_bytes: Option<usize>,
71}
72
73impl From<&StorageOpts> for SstableBuilderOptions {
74 fn from(options: &StorageOpts) -> SstableBuilderOptions {
75 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
76 SstableBuilderOptions {
77 capacity,
78 block_capacity: (options.block_size_kb as usize) * (1 << 10),
79 restart_interval: DEFAULT_RESTART_INTERVAL,
80 bloom_false_positive: options.bloom_false_positive,
81 compression_algorithm: CompressionAlgorithm::None,
82 max_sst_size: options.compactor_max_sst_size,
83 shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
84 max_vnode_key_range_bytes: None,
85 }
86 }
87}
88
89impl Default for SstableBuilderOptions {
90 fn default() -> Self {
91 Self {
92 capacity: DEFAULT_SSTABLE_SIZE,
93 block_capacity: DEFAULT_BLOCK_SIZE,
94 restart_interval: DEFAULT_RESTART_INTERVAL,
95 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
96 compression_algorithm: CompressionAlgorithm::None,
97 max_sst_size: DEFAULT_MAX_SST_SIZE,
98 shorten_block_meta_key_threshold: None,
99 max_vnode_key_range_bytes: None,
100 }
101 }
102}
103
104pub struct SstableBuilderOutput<WO> {
105 pub sst_info: LocalSstableInfo,
106 pub writer_output: WO,
107 pub stats: SstableBuilderOutputStats,
108}
109
110pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
111 options: SstableBuilderOptions,
113 writer: W,
115 block_builder: BlockBuilder,
117
118 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
119 block_metas: Vec<BlockMeta>,
121
122 table_ids: BTreeSet<TableId>,
124 last_full_key: Vec<u8>,
125 raw_key: BytesMut,
127 raw_value: BytesMut,
128 last_table_id: Option<TableId>,
129 sst_object_id: HummockSstableObjectId,
130
131 table_stats: TableStatsMap,
133 last_table_stats: TableStats,
136
137 filter_builder: F,
138
139 epoch_set: BTreeSet<u64>,
140 memory_limiter: Option<Arc<MemoryLimiter>>,
141
142 block_size_vec: Vec<usize>, vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
144}
145
146impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
147 pub fn for_test(
148 sstable_id: u64,
149 writer: W,
150 options: SstableBuilderOptions,
151 table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
152 table_id_to_watermark_serde: HashMap<
153 impl Into<TableId>,
154 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
155 >,
156 ) -> Self {
157 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
158 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
159 table_id_to_vnode
160 .into_iter()
161 .map(|(table_id, v)| (table_id.into(), v))
162 .collect(),
163 table_id_to_watermark_serde
164 .into_iter()
165 .map(|(table_id, v)| (table_id.into(), v))
166 .collect(),
167 HashMap::default(),
168 ));
169
170 Self::new(
171 sstable_id,
172 writer,
173 Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
174 options,
175 compaction_catalog_agent_ref,
176 None,
177 )
178 }
179}
180
181impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
182 pub fn new(
183 sst_object_id: impl Into<HummockSstableObjectId>,
184 writer: W,
185 filter_builder: F,
186 options: SstableBuilderOptions,
187 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
188 memory_limiter: Option<Arc<MemoryLimiter>>,
189 ) -> Self {
190 let sst_object_id = sst_object_id.into();
191 Self {
192 vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
193 options.max_vnode_key_range_bytes,
194 ),
195 options: options.clone(),
196 writer,
197 block_builder: BlockBuilder::new(BlockBuilderOptions {
198 capacity: options.block_capacity,
199 restart_interval: options.restart_interval,
200 compression_algorithm: options.compression_algorithm,
201 }),
202 filter_builder,
203 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
204 table_ids: BTreeSet::new(),
205 last_table_id: None,
206 raw_key: BytesMut::new(),
207 raw_value: BytesMut::new(),
208 last_full_key: vec![],
209 sst_object_id,
210 compaction_catalog_agent_ref,
211 table_stats: Default::default(),
212 last_table_stats: Default::default(),
213 epoch_set: BTreeSet::default(),
214 memory_limiter,
215 block_size_vec: Vec::new(),
216 }
217 }
218
219 pub async fn add_for_test(
221 &mut self,
222 full_key: FullKey<&[u8]>,
223 value: HummockValue<&[u8]>,
224 ) -> HummockResult<()> {
225 self.add(full_key, value).await
226 }
227
228 pub fn current_block_size(&self) -> usize {
229 self.block_builder.approximate_len()
230 }
231
232 pub async fn add_raw_block(
234 &mut self,
235 buf: Bytes,
236 filter_data: Vec<u8>,
237 smallest_key: FullKey<Vec<u8>>,
238 largest_key: Vec<u8>,
239 mut meta: BlockMeta,
240 ) -> HummockResult<bool> {
241 let table_id = smallest_key.user_key.table_id;
242 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
243 if !self.block_builder.is_empty() {
244 self.build_block().await?;
246 }
247
248 self.table_ids.insert(table_id);
249 self.finalize_last_table_stats();
250 self.last_table_id = Some(table_id);
251 }
252
253 if !self.block_builder.is_empty() {
254 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
255
256 if self.block_builder.approximate_len() < min_block_size {
258 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
259 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
260 iter.seek_to_first();
261 while iter.is_valid() {
262 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
263 panic!(
264 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
265 self.sst_object_id, self.block_metas.len(), self.last_table_id
266 )
267 });
268 self.add_impl(iter.key(), value, false).await?;
269 iter.next();
270 }
271 return Ok(false);
272 }
273
274 self.build_block().await?;
275 }
276 self.last_full_key = largest_key;
277 assert_eq!(
278 meta.len as usize,
279 buf.len(),
280 "meta {} buf {} last_table_id {:?}",
281 meta.len,
282 buf.len(),
283 self.last_table_id
284 );
285 meta.offset = self.writer.data_len() as u32;
286 self.block_metas.push(meta);
287 self.filter_builder.add_raw_data(filter_data);
288 let block_meta = self.block_metas.last_mut().unwrap();
289 self.writer.write_block_bytes(buf, block_meta).await?;
290
291 Ok(true)
292 }
293
294 pub async fn add(
296 &mut self,
297 full_key: FullKey<&[u8]>,
298 value: HummockValue<&[u8]>,
299 ) -> HummockResult<()> {
300 self.add_impl(full_key, value, true).await
301 }
302
303 async fn add_impl(
305 &mut self,
306 full_key: FullKey<&[u8]>,
307 value: HummockValue<&[u8]>,
308 could_switch_block: bool,
309 ) -> HummockResult<()> {
310 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
311
312 let table_key_len = full_key.user_key.table_key.as_ref().len();
313 let table_value_len = match &value {
314 HummockValue::Put(t) => t.len(),
315 HummockValue::Delete => 0,
316 };
317 let large_value_len = self.options.max_sst_size as usize / 10;
318 let large_key_value_len = self.options.max_sst_size as usize / 2;
319 if table_key_len >= LARGE_KEY_LEN
320 || table_value_len > large_value_len
321 || table_key_len + table_value_len > large_key_value_len
322 {
323 let table_id = full_key.user_key.table_id;
324 tracing::warn!(
325 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
326 table_id,
327 table_key_len,
328 table_value_len,
329 full_key.epoch_with_gap.pure_epoch(),
330 full_key.epoch_with_gap.offset(),
331 );
332 }
333
334 full_key.encode_into(&mut self.raw_key);
336 value.encode(&mut self.raw_value);
337 let is_new_user_key = self.last_full_key.is_empty()
338 || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
339 let table_id = full_key.user_key.table_id;
340 let is_new_table = self.last_table_id != Some(table_id);
341 let current_block_size = self.current_block_size();
342 let is_block_full = current_block_size >= self.options.block_capacity
343 || (current_block_size > self.options.block_capacity / 4 * 3
344 && current_block_size + self.raw_value.len() + self.raw_key.len()
345 > self.options.block_capacity);
346
347 if is_new_table {
348 assert!(
349 could_switch_block,
350 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
351 is_new_user_key,
352 self.sst_object_id,
353 self.block_metas.len(),
354 table_id,
355 self.last_table_id,
356 full_key
357 );
358 self.table_ids.insert(table_id);
359 self.finalize_last_table_stats();
360 self.last_table_id = Some(table_id);
361 if !self.block_builder.is_empty() {
362 self.build_block().await?;
363 }
364 } else if is_block_full && could_switch_block {
365 self.build_block().await?;
366 }
367 self.last_table_stats.total_key_count += 1;
368 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
369
370 if self.block_builder.is_empty() {
372 let smallest_key = if let Some(threshold) =
373 self.options.shorten_block_meta_key_threshold
374 && !self.last_full_key.is_empty()
375 && full_key.encoded_len() >= threshold
376 {
377 let prev = FullKey::decode(&self.last_full_key);
378 if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
379 shortened.encode()
380 } else {
381 full_key.encode()
382 }
383 } else {
384 full_key.encode()
385 };
386
387 self.block_metas.push(BlockMeta {
388 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
389 panic!(
390 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
391 self.writer.data_len(),
392 self.sst_object_id,
393 self.block_metas.len(),
394 self.table_ids,
395 )
396 }),
397 len: 0,
398 smallest_key,
399 uncompressed_size: 0,
400 total_key_count: 0,
401 stale_key_count: 0,
402 });
403 }
404
405 let filter_key = self
406 .compaction_catalog_agent_ref
407 .extract(user_key(&self.raw_key));
408 if !filter_key.is_empty() {
410 self.filter_builder
411 .add_key(filter_key, table_id.as_raw_id());
412 }
413 self.block_builder.add(
415 table_id,
416 &self.raw_key[TABLE_PREFIX_LEN..],
417 self.raw_value.as_ref(),
418 );
419 self.block_metas.last_mut().unwrap().total_key_count += 1;
420 if !is_new_user_key || value.is_delete() {
421 self.block_metas.last_mut().unwrap().stale_key_count += 1;
422 }
423 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
424 self.last_table_stats.total_value_size += value.encoded_len() as i64;
425
426 if let Some(collector) = self.vnode_range_collector.as_mut() {
427 collector.observe_key(
428 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
429 &self.raw_key,
430 &self.last_full_key,
431 );
432 }
433
434 self.last_full_key.clear();
435 self.last_full_key.extend_from_slice(&self.raw_key);
436
437 self.raw_key.clear();
438 self.raw_value.clear();
439 Ok(())
440 }
441
442 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
452 let smallest_key = if self.block_metas.is_empty() {
453 vec![]
454 } else {
455 self.block_metas[0].smallest_key.clone()
456 };
457 let largest_key = self.last_full_key.clone();
458 self.finalize_last_table_stats();
459
460 assert!(
463 self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
464 "vnode key-range hints are only supported for single-table SSTs, found {} tables",
465 self.table_ids.len()
466 );
467
468 self.build_block().await?;
469 let right_exclusive = false;
470 let meta_offset = self.writer.data_len() as u64;
471
472 let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
473 BloomFilterType::Blocked
474 } else {
475 BloomFilterType::Sstable
476 };
477 let bloom_filter = if self.options.bloom_false_positive > 0.0 {
478 self.filter_builder.finish(self.memory_limiter.clone())
479 } else {
480 vec![]
481 };
482
483 let total_key_count = self
484 .block_metas
485 .iter()
486 .map(|block_meta| block_meta.total_key_count as u64)
487 .sum::<u64>();
488 let stale_key_count = self
489 .block_metas
490 .iter()
491 .map(|block_meta| block_meta.stale_key_count as u64)
492 .sum::<u64>();
493 let uncompressed_file_size = self
494 .block_metas
495 .iter()
496 .map(|block_meta| block_meta.uncompressed_size as u64)
497 .sum::<u64>();
498
499 #[expect(deprecated)]
500 let mut meta = SstableMeta {
501 block_metas: self.block_metas,
502 bloom_filter,
503 estimated_size: 0,
504 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
505 panic!(
506 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
507 total_key_count, self.table_ids,
508 )
509 }),
510 smallest_key,
511 largest_key,
512 version: VERSION,
513 meta_offset,
514 monotonic_tombstone_events: vec![],
515 };
516
517 let meta_encode_size = meta.encoded_size();
518 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
519 panic!(
520 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
521 meta_encode_size, self.table_ids,
522 )
523 });
524 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
525 panic!(
526 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
527 meta_offset, self.table_ids,
528 )
529 });
530 meta.estimated_size = encoded_size_u32
531 .checked_add(meta_offset_u32)
532 .unwrap_or_else(|| {
533 panic!(
534 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
535 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
536 )
537 });
538
539 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
540 (0, 0)
541 } else {
542 let total_key_count: usize = self
543 .table_stats
544 .values()
545 .map(|s| s.total_key_count as usize)
546 .sum();
547
548 if total_key_count == 0 {
549 (0, 0)
550 } else {
551 let total_key_size: usize = self
552 .table_stats
553 .values()
554 .map(|s| s.total_key_size as usize)
555 .sum();
556
557 let total_value_size: usize = self
558 .table_stats
559 .values()
560 .map(|s| s.total_value_size as usize)
561 .sum();
562
563 (
564 total_key_size / total_key_count,
565 total_value_size / total_key_count,
566 )
567 }
568 };
569
570 let (min_epoch, max_epoch) = {
571 if self.epoch_set.is_empty() {
572 (HummockEpoch::MAX, u64::MIN)
573 } else {
574 (
575 *self.epoch_set.first().unwrap(),
576 *self.epoch_set.last().unwrap(),
577 )
578 }
579 };
580
581 let vnode_user_key_ranges = self
582 .vnode_range_collector
583 .take()
584 .and_then(|collector| collector.finish(&self.last_full_key));
585
586 let sst_info: SstableInfo = SstableInfoInner {
587 object_id: self.sst_object_id,
588 sst_id: self.sst_object_id.as_raw_id().into(),
590 bloom_filter_kind,
591 key_range: KeyRange {
592 left: Bytes::from(meta.smallest_key.clone()),
593 right: Bytes::from(meta.largest_key.clone()),
594 right_exclusive,
595 },
596 file_size: meta.estimated_size as u64,
597 table_ids: self.table_ids.into_iter().collect(),
598 meta_offset: meta.meta_offset,
599 stale_key_count,
600 total_key_count,
601 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
602 min_epoch,
603 max_epoch,
604 range_tombstone_count: 0,
605 sst_size: meta.estimated_size as u64,
606 vnode_statistics: vnode_user_key_ranges,
607 }
608 .into();
609
610 tracing::trace!(
611 "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
612 meta.encoded_size(),
613 meta.bloom_filter.len(),
614 total_key_count,
615 stale_key_count,
616 min_epoch,
617 max_epoch,
618 self.epoch_set.len()
619 );
620 let bloom_filter_size = meta.bloom_filter.len();
621 let sstable_file_size = sst_info.file_size as usize;
622
623 if !meta.block_metas.is_empty() {
624 let mut last_table_id = meta.block_metas[0].table_id();
626 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
627 for block_meta in &meta.block_metas {
628 let block_table_id = block_meta.table_id();
629 if last_table_id != block_table_id {
630 last_table_id = block_table_id;
631 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
632 }
633
634 last_table_stats.total_compressed_size += block_meta.len as u64;
635 }
636 }
637
638 let writer_output = self.writer.finish(meta).await?;
639 let now = SystemTime::now()
648 .duration_since(SystemTime::UNIX_EPOCH)
649 .expect("Clock may have gone backwards")
650 .as_secs();
651 Ok(SstableBuilderOutput::<W::Output> {
652 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
653 writer_output,
654 stats: SstableBuilderOutputStats {
655 bloom_filter_size,
656 avg_key_size,
657 avg_value_size,
658 epoch_count: self.epoch_set.len(),
659 block_size_vec: self.block_size_vec,
660 sstable_file_size,
661 },
662 })
663 }
664
665 pub fn approximate_len(&self) -> usize {
666 self.writer.data_len()
667 + self.block_builder.approximate_len()
668 + self.filter_builder.approximate_len()
669 }
670
671 pub async fn build_block(&mut self) -> HummockResult<()> {
672 if self.block_builder.is_empty() {
674 return Ok(());
675 }
676
677 let block_meta = self.block_metas.last_mut().unwrap();
678 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
679 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
680 .unwrap_or_else(|_| {
681 panic!(
682 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
683 uncompressed_block_size,
684 self.block_builder.table_id(),
685 )
686 });
687 let block = self.block_builder.build();
688 self.writer.write_block(block, block_meta).await?;
689 self.block_size_vec.push(block.len());
690 self.filter_builder
691 .switch_block(self.memory_limiter.clone());
692 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
693 panic!(
694 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
695 self.writer.data_len(),
696 self.block_builder.table_id(),
697 )
698 });
699 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
700 panic!(
701 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
702 data_len, block_meta.offset
703 )
704 });
705
706 if data_len as usize > self.options.capacity * 2 {
707 tracing::warn!(
708 "WARN unexpected block size {} table {:?}",
709 data_len,
710 self.block_builder.table_id()
711 );
712 }
713
714 self.block_builder.clear();
715 Ok(())
716 }
717
718 pub fn is_empty(&self) -> bool {
719 self.writer.data_len() > 0
720 }
721
722 pub fn reach_capacity(&self) -> bool {
724 self.approximate_len() >= self.options.capacity
725 }
726
727 fn finalize_last_table_stats(&mut self) {
728 if self.table_ids.is_empty() || self.last_table_id.is_none() {
729 return;
730 }
731 self.table_stats.insert(
732 self.last_table_id.unwrap(),
733 std::mem::take(&mut self.last_table_stats),
734 );
735 }
736}
737
738struct VnodeUserKeyRangeCollector {
740 max_bytes: usize,
741 current_size: usize,
742 ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
743 current_vnode: VirtualNode,
744 range_start_key: Vec<u8>,
745}
746
747impl VnodeUserKeyRangeCollector {
748 fn new(max_bytes: usize) -> Self {
749 Self {
750 max_bytes,
751 current_size: 0,
752 ranges: BTreeMap::new(),
753 current_vnode: VirtualNode::ZERO,
754 range_start_key: Vec::new(),
755 }
756 }
757
758 fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
759 max_bytes.filter(|&n| n > 0).map(Self::new)
760 }
761
762 fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
765 if self.current_size >= self.max_bytes {
766 return;
767 }
768
769 if self.range_start_key.is_empty() {
771 self.current_vnode = vnode;
772 self.range_start_key = key.to_vec();
773 return;
774 }
775
776 if vnode == self.current_vnode {
778 return;
779 }
780
781 self.seal_range(prev_key);
783
784 if self.current_size >= self.max_bytes {
786 return;
787 }
788
789 self.current_vnode = vnode;
791 self.range_start_key = key.to_vec();
792 }
793
794 fn seal_range(&mut self, right_key: &[u8]) {
796 let left_key = mem::take(&mut self.range_start_key);
797 self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
798
799 let left_full_key = FullKey::decode(&left_key);
800 let right_full_key = FullKey::decode(right_key);
801 let left_user_key = left_full_key.user_key.copy_into();
802 let right_user_key = right_full_key.user_key.copy_into();
803
804 assert_eq!(
809 left_user_key.get_vnode_id(),
810 right_user_key.get_vnode_id(),
811 "vnode changed within range: left_user {:?}, right_user {:?}",
812 left_user_key,
813 right_user_key
814 );
815 assert_eq!(
816 left_user_key.get_vnode_id(),
817 self.current_vnode.to_index(),
818 "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
819 left_user_key,
820 right_user_key,
821 self.current_vnode.to_index()
822 );
823 assert_eq!(
824 left_user_key.table_id, right_user_key.table_id,
825 "table_id changed within range: left {:?}, right {:?}",
826 left_user_key, right_user_key
827 );
828
829 self.ranges
830 .insert(self.current_vnode, (left_user_key, right_user_key));
831 }
832
833 fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
835 if !self.range_start_key.is_empty() {
836 self.seal_range(last_key);
837 }
838
839 if self.ranges.len() > 1 {
840 let mut table_ids = self
842 .ranges
843 .values()
844 .flat_map(|(left, right)| [left.table_id, right.table_id]);
845 if let Some(first_table_id) = table_ids.next() {
846 for table_id in table_ids {
847 assert_eq!(
848 table_id, first_table_id,
849 "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
850 table_id, first_table_id
851 );
852 }
853 }
854
855 Some(VnodeStatistics::from_map(self.ranges))
856 } else {
857 None
858 }
859 }
860}
861
862pub struct SstableBuilderOutputStats {
863 bloom_filter_size: usize,
864 avg_key_size: usize,
865 avg_value_size: usize,
866 epoch_count: usize,
867 block_size_vec: Vec<usize>, sstable_file_size: usize,
869}
870
871impl SstableBuilderOutputStats {
872 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
873 if self.bloom_filter_size != 0 {
874 metrics
875 .sstable_bloom_filter_size
876 .observe(self.bloom_filter_size as _);
877 }
878
879 if self.sstable_file_size != 0 {
880 metrics
881 .sstable_file_size
882 .observe(self.sstable_file_size as _);
883 }
884
885 if self.avg_key_size != 0 {
886 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
887 }
888
889 if self.avg_value_size != 0 {
890 metrics
891 .sstable_avg_value_size
892 .observe(self.avg_value_size as _);
893 }
894
895 if self.epoch_count != 0 {
896 metrics
897 .sstable_distinct_epoch_count
898 .observe(self.epoch_count as _);
899 }
900
901 if !self.block_size_vec.is_empty() {
902 for block_size in &self.block_size_vec {
903 metrics.sstable_block_size.observe(*block_size as _);
904 }
905 }
906 }
907}
908
909#[cfg(test)]
910pub(super) mod tests {
911 use std::collections::{Bound, HashMap};
912
913 use risingwave_common::catalog::TableId;
914 use risingwave_common::hash::VirtualNode;
915 use risingwave_common::util::epoch::test_epoch;
916 use risingwave_hummock_sdk::key::UserKey;
917
918 use super::*;
919 use crate::assert_bytes_eq;
920 use crate::compaction_catalog_manager::{
921 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
922 };
923 use crate::hummock::iterator::test_utils::mock_sstable_store;
924 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
925 use crate::hummock::test_utils::{
926 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
927 test_key_of, test_value_of,
928 };
929 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
930 use crate::monitor::StoreLocalStatistic;
931
932 #[tokio::test]
933 async fn test_empty() {
934 let opt = SstableBuilderOptions {
935 capacity: 0,
936 block_capacity: 4096,
937 restart_interval: 16,
938 bloom_false_positive: 0.001,
939 ..Default::default()
940 };
941
942 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
943 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
944 let b = SstableBuilder::for_test(
945 0,
946 mock_sst_writer(&opt),
947 opt,
948 table_id_to_vnode,
949 table_id_to_watermark_serde,
950 );
951
952 b.finish().await.unwrap();
953 }
954
955 fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
956 let mut table_key = vnode.to_be_bytes().to_vec();
957 table_key.extend_from_slice(table_key_suffix);
958 FullKey::for_test(TableId::default(), table_key, 0).encode()
959 }
960
961 fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
962 let mut key = vnode.to_be_bytes().to_vec();
963 key.extend_from_slice(suffix);
964 key
965 }
966
967 #[test]
968 fn test_vnode_user_key_range_basic_collection() {
969 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
972 let vnode_1 = VirtualNode::from_index(1);
973 let vnode_2 = VirtualNode::from_index(2);
974
975 let k1 = encode_full_key(vnode_1, b"k1");
976 let k2 = encode_full_key(vnode_1, b"k2");
977 let k3 = encode_full_key(vnode_2, b"k3");
978 let k4 = encode_full_key(vnode_2, b"k4");
979
980 collector.observe_key(vnode_1, &k1, &[]);
981 collector.observe_key(vnode_1, &k2, &k1);
982 collector.observe_key(vnode_2, &k3, &k2);
983 collector.observe_key(vnode_2, &k4, &k3);
984
985 let info = collector.finish(&k4).unwrap();
986 assert_eq!(info.vnode_user_key_ranges().len(), 2);
987
988 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
990 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
991 assert_eq!(
992 range1_right.table_key.as_ref(),
993 table_key_of(vnode_1, b"k2")
994 );
995
996 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
998 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
999 assert_eq!(
1000 range2_right.table_key.as_ref(),
1001 table_key_of(vnode_2, b"k4")
1002 );
1003 }
1004
1005 #[test]
1006 fn test_vnode_user_key_range_capacity_limit() {
1007 let vnode_1 = VirtualNode::from_index(1);
1017 let vnode_2 = VirtualNode::from_index(2);
1018 let vnode_3 = VirtualNode::from_index(3);
1019 let vnode_4 = VirtualNode::from_index(4);
1020
1021 let k1 = encode_full_key(vnode_1, b"k1");
1022 let k2 = encode_full_key(vnode_1, b"k2");
1023 let k3 = encode_full_key(vnode_2, b"k3");
1024 let k4 = encode_full_key(vnode_2, b"k4");
1025 let k5 = encode_full_key(vnode_3, b"k5");
1026 let k6 = encode_full_key(vnode_3, b"k6");
1027 let k7 = encode_full_key(vnode_4, b"k7");
1028
1029 let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1030 let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1031 let limit = range_size + estimated_next_size; let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1033
1034 collector.observe_key(vnode_1, &k1, &[]);
1035 collector.observe_key(vnode_1, &k2, &k1);
1036 collector.observe_key(vnode_2, &k3, &k2); collector.observe_key(vnode_2, &k4, &k3);
1038 collector.observe_key(vnode_3, &k5, &k4); collector.observe_key(vnode_3, &k6, &k5); collector.observe_key(vnode_4, &k7, &k6); let info = collector.finish(&k7).unwrap();
1043 assert_eq!(
1044 info.vnode_user_key_ranges().len(),
1045 2,
1046 "Should collect exactly 2 vnodes"
1047 );
1048
1049 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1051 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1052 assert_eq!(
1053 range1_right.table_key.as_ref(),
1054 table_key_of(vnode_1, b"k2")
1055 );
1056
1057 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1058 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1059 assert_eq!(
1060 range2_right.table_key.as_ref(),
1061 table_key_of(vnode_2, b"k4")
1062 );
1063
1064 assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1066 assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1067 }
1068
1069 #[test]
1070 fn test_vnode_user_key_range_sparse_distribution() {
1071 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1074 let vnode_5 = VirtualNode::from_index(5);
1075 let vnode_10 = VirtualNode::from_index(10);
1076 let vnode_100 = VirtualNode::from_index(100);
1077
1078 let keys = vec![
1079 (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1080 (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1081 (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1082 (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1083 (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1084 (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1085 (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1086 ];
1087
1088 let mut prev_key = Vec::new();
1089 for (vnode, key) in &keys {
1090 collector.observe_key(*vnode, key, &prev_key);
1091 prev_key = key.clone();
1092 }
1093
1094 let info = collector.finish(&prev_key).unwrap();
1095 assert_eq!(info.vnode_user_key_ranges().len(), 3);
1096
1097 let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1099 assert_eq!(
1100 range5_left.table_key.as_ref(),
1101 table_key_of(vnode_5, b"key_005_001")
1102 );
1103 assert_eq!(
1104 range5_right.table_key.as_ref(),
1105 table_key_of(vnode_5, b"key_005_999")
1106 );
1107
1108 let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1109 assert_eq!(
1110 range10_left.table_key.as_ref(),
1111 table_key_of(vnode_10, b"key_010_001")
1112 );
1113 assert_eq!(
1114 range10_right.table_key.as_ref(),
1115 table_key_of(vnode_10, b"key_010_100")
1116 );
1117
1118 let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1119 assert_eq!(
1120 range100_left.table_key.as_ref(),
1121 table_key_of(vnode_100, b"key_100_001")
1122 );
1123 assert_eq!(
1124 range100_right.table_key.as_ref(),
1125 table_key_of(vnode_100, b"key_100_999")
1126 );
1127
1128 assert!(
1130 info.get_vnode_user_key_range(VirtualNode::from_index(0))
1131 .is_none()
1132 );
1133 assert!(
1134 info.get_vnode_user_key_range(VirtualNode::from_index(7))
1135 .is_none()
1136 );
1137 assert!(
1138 info.get_vnode_user_key_range(VirtualNode::from_index(50))
1139 .is_none()
1140 );
1141 }
1142
1143 #[test]
1144 fn test_vnode_user_key_range_edge_cases() {
1145 assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1147 assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1148
1149 let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1151 assert!(
1152 collector
1153 .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1154 .is_none()
1155 );
1156
1157 let vnode = VirtualNode::from_index(5);
1159 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1160 let a = encode_full_key(vnode, b"a");
1161 let b = encode_full_key(vnode, b"b");
1162 let c = encode_full_key(vnode, b"c");
1163 collector.observe_key(vnode, &a, &[]);
1164 collector.observe_key(vnode, &b, &a);
1165 collector.observe_key(vnode, &c, &b);
1166 assert!(
1167 collector.finish(&c).is_none(),
1168 "Single-vnode SST should not emit hints"
1169 );
1170
1171 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1176 let vnode_1 = VirtualNode::from_index(1);
1177 let vnode_2 = VirtualNode::from_index(2);
1178 let key1 = encode_full_key(vnode_1, b"key1");
1179 let key2 = encode_full_key(vnode_1, b"key2");
1180 let key3 = encode_full_key(vnode_2, b"key3");
1181 collector.observe_key(vnode_1, &key1, &[]);
1182 collector.observe_key(vnode_1, &key2, &key1);
1183 collector.observe_key(vnode_2, &key3, &key2); assert!(
1185 collector.finish(&key3).is_none(),
1186 "Only 1 vnode collected, should return None"
1187 );
1188 }
1189
1190 #[tokio::test]
1191 async fn test_basic() {
1192 let opt = default_builder_opt_for_test();
1193
1194 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1195 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1196 let mut b = SstableBuilder::for_test(
1197 0,
1198 mock_sst_writer(&opt),
1199 opt,
1200 table_id_to_vnode,
1201 table_id_to_watermark_serde,
1202 );
1203
1204 for i in 0..TEST_KEYS_COUNT {
1205 b.add_for_test(
1206 test_key_of(i).to_ref(),
1207 HummockValue::put(&test_value_of(i)),
1208 )
1209 .await
1210 .unwrap();
1211 }
1212
1213 let output = b.finish().await.unwrap();
1214 let info = output.sst_info.sst_info;
1215
1216 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1217 assert_bytes_eq!(
1218 test_key_of(TEST_KEYS_COUNT - 1).encode(),
1219 info.key_range.right
1220 );
1221 let (data, meta) = output.writer_output;
1222 assert_eq!(info.file_size, meta.estimated_size as u64);
1223 let offset = info.meta_offset as usize;
1224 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1225 assert_eq!(meta2, meta);
1226 }
1227
1228 async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
1229 let key_count = 1000;
1230
1231 let opts = SstableBuilderOptions {
1232 capacity: 0,
1233 block_capacity: 4096,
1234 restart_interval: 16,
1235 bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
1236 ..Default::default()
1237 };
1238
1239 let sstable_store = mock_sstable_store().await;
1241 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1242 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1243 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1244 opts,
1245 0,
1246 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1247 sstable_store.clone(),
1248 CachePolicy::NotFill,
1249 table_id_to_vnode,
1250 table_id_to_watermark_serde,
1251 )
1252 .await;
1253 let table = sstable_store
1254 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1255 .await
1256 .unwrap();
1257
1258 assert_eq!(table.has_bloom_filter(), with_blooms);
1259 for i in 0..key_count {
1260 let full_key = test_key_of(i);
1261 if table.has_bloom_filter() {
1262 let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
1263 let key_ref = full_key.user_key.as_ref();
1264 assert!(
1265 table.may_match_hash(
1266 &(Bound::Included(key_ref), Bound::Included(key_ref)),
1267 hash
1268 ),
1269 "failed at {}",
1270 i
1271 );
1272 }
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_bloom_filter() {
1278 test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
1279 test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
1280 test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
1281 test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
1282 }
1283
1284 #[tokio::test]
1285 async fn test_no_bloom_filter_block() {
1286 let opts = SstableBuilderOptions::default();
1287 let sstable_store = mock_sstable_store().await;
1289 let writer_opts = SstableWriterOptions::default();
1290 let object_id = 1;
1291 let writer = sstable_store
1292 .clone()
1293 .create_sst_writer(object_id, writer_opts);
1294 let mut filter = MultiFilterKeyExtractor::default();
1295 filter.register(
1296 1.into(),
1297 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1298 );
1299 filter.register(
1300 2.into(),
1301 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1302 );
1303 filter.register(
1304 3.into(),
1305 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1306 );
1307
1308 let table_id_to_vnode = HashMap::from_iter(vec![
1309 (1.into(), VirtualNode::COUNT_FOR_TEST),
1310 (2.into(), VirtualNode::COUNT_FOR_TEST),
1311 (3.into(), VirtualNode::COUNT_FOR_TEST),
1312 ]);
1313 let table_id_to_watermark_serde =
1314 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1315
1316 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1317 FilterKeyExtractorImpl::Multi(filter),
1318 table_id_to_vnode,
1319 table_id_to_watermark_serde,
1320 HashMap::default(),
1321 ));
1322
1323 let mut builder = SstableBuilder::new(
1324 object_id,
1325 writer,
1326 BlockedXor16FilterBuilder::new(1024),
1327 opts,
1328 compaction_catalog_agent_ref,
1329 None,
1330 );
1331
1332 let key_count: usize = 10000;
1333 for table_id in 1..4 {
1334 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1335 for idx in 0..key_count {
1336 table_key.resize(VirtualNode::SIZE, 0);
1337 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1338 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1339 let v = test_value_of(idx);
1340 builder
1341 .add(
1342 FullKey::from_user_key(k, test_epoch(1)),
1343 HummockValue::put(v.as_ref()),
1344 )
1345 .await
1346 .unwrap();
1347 }
1348 }
1349 let ret = builder.finish().await.unwrap();
1350 let sst_info = ret.sst_info.sst_info.clone();
1351 ret.writer_output.await.unwrap().unwrap();
1352 let table = sstable_store
1353 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1354 .await
1355 .unwrap();
1356 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1357 for idx in 0..key_count {
1358 table_key.resize(VirtualNode::SIZE, 0);
1359 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1360 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1361 let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
1362 let key_ref = k.as_ref();
1363 assert!(
1364 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1365 );
1366 }
1367 }
1368}