risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::util::row_serde::OrderedRowSerde;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36    FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53    /// Approximate sstable capacity.
54    pub capacity: usize,
55    /// Approximate block capacity.
56    pub block_capacity: usize,
57    /// Restart point interval.
58    pub restart_interval: usize,
59    /// False positive probability of bloom filter.
60    pub bloom_false_positive: f64,
61    /// Compression algorithm.
62    pub compression_algorithm: CompressionAlgorithm,
63    pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67    fn from(options: &StorageOpts) -> SstableBuilderOptions {
68        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69        SstableBuilderOptions {
70            capacity,
71            block_capacity: (options.block_size_kb as usize) * (1 << 10),
72            restart_interval: DEFAULT_RESTART_INTERVAL,
73            bloom_false_positive: options.bloom_false_positive,
74            compression_algorithm: CompressionAlgorithm::None,
75            max_sst_size: options.compactor_max_sst_size,
76        }
77    }
78}
79
80impl Default for SstableBuilderOptions {
81    fn default() -> Self {
82        Self {
83            capacity: DEFAULT_SSTABLE_SIZE,
84            block_capacity: DEFAULT_BLOCK_SIZE,
85            restart_interval: DEFAULT_RESTART_INTERVAL,
86            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87            compression_algorithm: CompressionAlgorithm::None,
88            max_sst_size: DEFAULT_MAX_SST_SIZE,
89        }
90    }
91}
92
93pub struct SstableBuilderOutput<WO> {
94    pub sst_info: LocalSstableInfo,
95    pub writer_output: WO,
96    pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100    /// Options.
101    options: SstableBuilderOptions,
102    /// Data writer.
103    writer: W,
104    /// Current block builder.
105    block_builder: BlockBuilder,
106
107    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108    /// Block metadata vec.
109    block_metas: Vec<BlockMeta>,
110
111    /// `table_id` of added keys.
112    table_ids: BTreeSet<u32>,
113    last_full_key: Vec<u8>,
114    /// Buffer for encoded key and value to avoid allocation.
115    raw_key: BytesMut,
116    raw_value: BytesMut,
117    last_table_id: Option<u32>,
118    sst_object_id: HummockSstableObjectId,
119
120    /// Per table stats.
121    table_stats: TableStatsMap,
122    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
123    /// by `finalize_last_table_stats`
124    last_table_stats: TableStats,
125
126    filter_builder: F,
127
128    epoch_set: BTreeSet<u64>,
129    memory_limiter: Option<Arc<MemoryLimiter>>,
130
131    block_size_vec: Vec<usize>, // for statistics
132}
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135    pub fn for_test(
136        sstable_id: u64,
137        writer: W,
138        options: SstableBuilderOptions,
139        table_id_to_vnode: HashMap<StateTableId, usize>,
140        table_id_to_watermark_serde: HashMap<
141            u32,
142            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143        >,
144    ) -> Self {
145        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147            table_id_to_vnode,
148            table_id_to_watermark_serde,
149        ));
150
151        Self::new(
152            sstable_id,
153            writer,
154            Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
155            options,
156            compaction_catalog_agent_ref,
157            None,
158        )
159    }
160}
161
162impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
163    pub fn new(
164        sst_object_id: impl Into<HummockSstableObjectId>,
165        writer: W,
166        filter_builder: F,
167        options: SstableBuilderOptions,
168        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
169        memory_limiter: Option<Arc<MemoryLimiter>>,
170    ) -> Self {
171        let sst_object_id = sst_object_id.into();
172        Self {
173            options: options.clone(),
174            writer,
175            block_builder: BlockBuilder::new(BlockBuilderOptions {
176                capacity: options.block_capacity,
177                restart_interval: options.restart_interval,
178                compression_algorithm: options.compression_algorithm,
179            }),
180            filter_builder,
181            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
182            table_ids: BTreeSet::new(),
183            last_table_id: None,
184            raw_key: BytesMut::new(),
185            raw_value: BytesMut::new(),
186            last_full_key: vec![],
187            sst_object_id,
188            compaction_catalog_agent_ref,
189            table_stats: Default::default(),
190            last_table_stats: Default::default(),
191            epoch_set: BTreeSet::default(),
192            memory_limiter,
193            block_size_vec: Vec::new(),
194        }
195    }
196
197    /// Add kv pair to sstable.
198    pub async fn add_for_test(
199        &mut self,
200        full_key: FullKey<&[u8]>,
201        value: HummockValue<&[u8]>,
202    ) -> HummockResult<()> {
203        self.add(full_key, value).await
204    }
205
206    pub fn current_block_size(&self) -> usize {
207        self.block_builder.approximate_len()
208    }
209
210    /// Add raw data of block to sstable. return false means fallback
211    pub async fn add_raw_block(
212        &mut self,
213        buf: Bytes,
214        filter_data: Vec<u8>,
215        smallest_key: FullKey<Vec<u8>>,
216        largest_key: Vec<u8>,
217        mut meta: BlockMeta,
218    ) -> HummockResult<bool> {
219        let table_id = smallest_key.user_key.table_id.table_id;
220        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
221            if !self.block_builder.is_empty() {
222                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
223                self.build_block().await?;
224            }
225
226            self.table_ids.insert(table_id);
227            self.finalize_last_table_stats();
228            self.last_table_id = Some(table_id);
229        }
230
231        if !self.block_builder.is_empty() {
232            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
233
234            // If the previous block is too small, we should merge it into the previous block.
235            if self.block_builder.approximate_len() < min_block_size {
236                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
237                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
238                iter.seek_to_first();
239                while iter.is_valid() {
240                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
241                        panic!(
242                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
243                            self.sst_object_id, self.block_metas.len(), self.last_table_id
244                        )
245                    });
246                    self.add_impl(iter.key(), value, false).await?;
247                    iter.next();
248                }
249                return Ok(false);
250            }
251
252            self.build_block().await?;
253        }
254        self.last_full_key = largest_key;
255        assert_eq!(
256            meta.len as usize,
257            buf.len(),
258            "meta {} buf {} last_table_id {:?}",
259            meta.len,
260            buf.len(),
261            self.last_table_id
262        );
263        meta.offset = self.writer.data_len() as u32;
264        self.block_metas.push(meta);
265        self.filter_builder.add_raw_data(filter_data);
266        let block_meta = self.block_metas.last_mut().unwrap();
267        self.writer.write_block_bytes(buf, block_meta).await?;
268
269        Ok(true)
270    }
271
272    /// Add kv pair to sstable.
273    pub async fn add(
274        &mut self,
275        full_key: FullKey<&[u8]>,
276        value: HummockValue<&[u8]>,
277    ) -> HummockResult<()> {
278        self.add_impl(full_key, value, true).await
279    }
280
281    /// Add kv pair to sstable.
282    async fn add_impl(
283        &mut self,
284        full_key: FullKey<&[u8]>,
285        value: HummockValue<&[u8]>,
286        could_switch_block: bool,
287    ) -> HummockResult<()> {
288        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
289
290        let table_key_len = full_key.user_key.table_key.as_ref().len();
291        let table_value_len = match &value {
292            HummockValue::Put(t) => t.len(),
293            HummockValue::Delete => 0,
294        };
295        let large_value_len = self.options.max_sst_size as usize / 10;
296        let large_key_value_len = self.options.max_sst_size as usize / 2;
297        if table_key_len >= LARGE_KEY_LEN
298            || table_value_len > large_value_len
299            || table_key_len + table_value_len > large_key_value_len
300        {
301            let table_id = full_key.user_key.table_id.table_id();
302            tracing::warn!(
303                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
304                table_id,
305                table_key_len,
306                table_value_len,
307                full_key.epoch_with_gap.pure_epoch(),
308                full_key.epoch_with_gap.offset(),
309            );
310        }
311
312        // TODO: refine me
313        full_key.encode_into(&mut self.raw_key);
314        value.encode(&mut self.raw_value);
315        let is_new_user_key = self.last_full_key.is_empty()
316            || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
317        let table_id = full_key.user_key.table_id.table_id();
318        let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
319        let current_block_size = self.current_block_size();
320        let is_block_full = current_block_size >= self.options.block_capacity
321            || (current_block_size > self.options.block_capacity / 4 * 3
322                && current_block_size + self.raw_value.len() + self.raw_key.len()
323                    > self.options.block_capacity);
324
325        if is_new_table {
326            assert!(
327                could_switch_block,
328                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
329                is_new_user_key,
330                self.sst_object_id,
331                self.block_metas.len(),
332                table_id,
333                self.last_table_id,
334                full_key
335            );
336            self.table_ids.insert(table_id);
337            self.finalize_last_table_stats();
338            self.last_table_id = Some(table_id);
339            if !self.block_builder.is_empty() {
340                self.build_block().await?;
341            }
342        } else if is_block_full && could_switch_block {
343            self.build_block().await?;
344        }
345        self.last_table_stats.total_key_count += 1;
346        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
347
348        // Rotate block builder if the previous one has been built.
349        if self.block_builder.is_empty() {
350            self.block_metas.push(BlockMeta {
351                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
352                    panic!(
353                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
354                        self.writer.data_len(),
355                        self.sst_object_id,
356                        self.block_metas.len(),
357                        self.table_ids,
358                    )
359                }),
360                len: 0,
361                smallest_key: full_key.encode(),
362                uncompressed_size: 0,
363                total_key_count: 0,
364                stale_key_count: 0,
365            });
366        }
367
368        let table_id = full_key.user_key.table_id.table_id();
369        let mut extract_key = user_key(&self.raw_key);
370        extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
371        // add bloom_filter check
372        if !extract_key.is_empty() {
373            self.filter_builder.add_key(extract_key, table_id);
374        }
375        self.block_builder.add(full_key, self.raw_value.as_ref());
376        self.block_metas.last_mut().unwrap().total_key_count += 1;
377        if !is_new_user_key || value.is_delete() {
378            self.block_metas.last_mut().unwrap().stale_key_count += 1;
379        }
380        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
381        self.last_table_stats.total_value_size += value.encoded_len() as i64;
382
383        self.last_full_key.clear();
384        self.last_full_key.extend_from_slice(&self.raw_key);
385
386        self.raw_key.clear();
387        self.raw_value.clear();
388        Ok(())
389    }
390
391    /// Finish building sst.
392    ///
393    /// # Format
394    ///
395    /// data:
396    ///
397    /// ```plain
398    /// | Block 0 | ... | Block N-1 | N (4B) |
399    /// ```
400    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
401        let smallest_key = if self.block_metas.is_empty() {
402            vec![]
403        } else {
404            self.block_metas[0].smallest_key.clone()
405        };
406        let largest_key = self.last_full_key.clone();
407        self.finalize_last_table_stats();
408
409        self.build_block().await?;
410        let right_exclusive = false;
411        let meta_offset = self.writer.data_len() as u64;
412
413        let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
414            BloomFilterType::Blocked
415        } else {
416            BloomFilterType::Sstable
417        };
418        let bloom_filter = if self.options.bloom_false_positive > 0.0 {
419            self.filter_builder.finish(self.memory_limiter.clone())
420        } else {
421            vec![]
422        };
423
424        let total_key_count = self
425            .block_metas
426            .iter()
427            .map(|block_meta| block_meta.total_key_count as u64)
428            .sum::<u64>();
429        let stale_key_count = self
430            .block_metas
431            .iter()
432            .map(|block_meta| block_meta.stale_key_count as u64)
433            .sum::<u64>();
434        let uncompressed_file_size = self
435            .block_metas
436            .iter()
437            .map(|block_meta| block_meta.uncompressed_size as u64)
438            .sum::<u64>();
439
440        #[expect(deprecated)]
441        let mut meta = SstableMeta {
442            block_metas: self.block_metas,
443            bloom_filter,
444            estimated_size: 0,
445            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
446                panic!(
447                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
448                    total_key_count, self.table_ids,
449                )
450            }),
451            smallest_key,
452            largest_key,
453            version: VERSION,
454            meta_offset,
455            monotonic_tombstone_events: vec![],
456        };
457
458        let meta_encode_size = meta.encoded_size();
459        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
460            panic!(
461                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
462                meta_encode_size, self.table_ids,
463            )
464        });
465        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
466            panic!(
467                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
468                meta_offset, self.table_ids,
469            )
470        });
471        meta.estimated_size = encoded_size_u32
472            .checked_add(meta_offset_u32)
473            .unwrap_or_else(|| {
474                panic!(
475                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
476                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
477                )
478            });
479
480        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
481            (0, 0)
482        } else {
483            let total_key_count: usize = self
484                .table_stats
485                .values()
486                .map(|s| s.total_key_count as usize)
487                .sum();
488
489            if total_key_count == 0 {
490                (0, 0)
491            } else {
492                let total_key_size: usize = self
493                    .table_stats
494                    .values()
495                    .map(|s| s.total_key_size as usize)
496                    .sum();
497
498                let total_value_size: usize = self
499                    .table_stats
500                    .values()
501                    .map(|s| s.total_value_size as usize)
502                    .sum();
503
504                (
505                    total_key_size / total_key_count,
506                    total_value_size / total_key_count,
507                )
508            }
509        };
510
511        let (min_epoch, max_epoch) = {
512            if self.epoch_set.is_empty() {
513                (HummockEpoch::MAX, u64::MIN)
514            } else {
515                (
516                    *self.epoch_set.first().unwrap(),
517                    *self.epoch_set.last().unwrap(),
518                )
519            }
520        };
521
522        let sst_info: SstableInfo = SstableInfoInner {
523            object_id: self.sst_object_id,
524            // use the same sst_id as object_id for initial sst
525            sst_id: self.sst_object_id.inner().into(),
526            bloom_filter_kind,
527            key_range: KeyRange {
528                left: Bytes::from(meta.smallest_key.clone()),
529                right: Bytes::from(meta.largest_key.clone()),
530                right_exclusive,
531            },
532            file_size: meta.estimated_size as u64,
533            table_ids: self.table_ids.into_iter().collect(),
534            meta_offset: meta.meta_offset,
535            stale_key_count,
536            total_key_count,
537            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
538            min_epoch,
539            max_epoch,
540            range_tombstone_count: 0,
541            sst_size: meta.estimated_size as u64,
542        }
543        .into();
544        tracing::trace!(
545            "meta_size {} bloom_filter_size {}  add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
546            meta.encoded_size(),
547            meta.bloom_filter.len(),
548            total_key_count,
549            stale_key_count,
550            min_epoch,
551            max_epoch,
552            self.epoch_set.len()
553        );
554        let bloom_filter_size = meta.bloom_filter.len();
555        let sstable_file_size = sst_info.file_size as usize;
556
557        if !meta.block_metas.is_empty() {
558            // fill total_compressed_size
559            let mut last_table_id = meta.block_metas[0].table_id().table_id();
560            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
561            for block_meta in &meta.block_metas {
562                let block_table_id = block_meta.table_id();
563                if last_table_id != block_table_id.table_id() {
564                    last_table_id = block_table_id.table_id();
565                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
566                }
567
568                last_table_stats.total_compressed_size += block_meta.len as u64;
569            }
570        }
571
572        let writer_output = self.writer.finish(meta).await?;
573        // The timestamp is only used during full GC.
574        //
575        // Ideally object store object's last_modified should be used.
576        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
577        //
578        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
579        // It should help alleviate the clock drift issue.
580
581        let now = SystemTime::now()
582            .duration_since(SystemTime::UNIX_EPOCH)
583            .expect("Clock may have gone backwards")
584            .as_secs();
585        Ok(SstableBuilderOutput::<W::Output> {
586            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
587            writer_output,
588            stats: SstableBuilderOutputStats {
589                bloom_filter_size,
590                avg_key_size,
591                avg_value_size,
592                epoch_count: self.epoch_set.len(),
593                block_size_vec: self.block_size_vec,
594                sstable_file_size,
595            },
596        })
597    }
598
599    pub fn approximate_len(&self) -> usize {
600        self.writer.data_len()
601            + self.block_builder.approximate_len()
602            + self.filter_builder.approximate_len()
603    }
604
605    pub async fn build_block(&mut self) -> HummockResult<()> {
606        // Skip empty block.
607        if self.block_builder.is_empty() {
608            return Ok(());
609        }
610
611        let block_meta = self.block_metas.last_mut().unwrap();
612        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
613        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
614            .unwrap_or_else(|_| {
615                panic!(
616                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
617                    uncompressed_block_size,
618                    self.block_builder.table_id(),
619                )
620            });
621        let block = self.block_builder.build();
622        self.writer.write_block(block, block_meta).await?;
623        self.block_size_vec.push(block.len());
624        self.filter_builder
625            .switch_block(self.memory_limiter.clone());
626        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
627            panic!(
628                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
629                self.writer.data_len(),
630                self.block_builder.table_id(),
631            )
632        });
633        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
634            panic!(
635                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
636                data_len, block_meta.offset
637            )
638        });
639
640        if data_len as usize > self.options.capacity * 2 {
641            tracing::warn!(
642                "WARN unexpected block size {} table {:?}",
643                data_len,
644                self.block_builder.table_id()
645            );
646        }
647
648        self.block_builder.clear();
649        Ok(())
650    }
651
652    pub fn is_empty(&self) -> bool {
653        self.writer.data_len() > 0
654    }
655
656    /// Returns true if we roughly reached capacity
657    pub fn reach_capacity(&self) -> bool {
658        self.approximate_len() >= self.options.capacity
659    }
660
661    fn finalize_last_table_stats(&mut self) {
662        if self.table_ids.is_empty() || self.last_table_id.is_none() {
663            return;
664        }
665        self.table_stats.insert(
666            self.last_table_id.unwrap(),
667            std::mem::take(&mut self.last_table_stats),
668        );
669    }
670}
671
672pub struct SstableBuilderOutputStats {
673    bloom_filter_size: usize,
674    avg_key_size: usize,
675    avg_value_size: usize,
676    epoch_count: usize,
677    block_size_vec: Vec<usize>, // for statistics
678    sstable_file_size: usize,
679}
680
681impl SstableBuilderOutputStats {
682    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
683        if self.bloom_filter_size != 0 {
684            metrics
685                .sstable_bloom_filter_size
686                .observe(self.bloom_filter_size as _);
687        }
688
689        if self.sstable_file_size != 0 {
690            metrics
691                .sstable_file_size
692                .observe(self.sstable_file_size as _);
693        }
694
695        if self.avg_key_size != 0 {
696            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
697        }
698
699        if self.avg_value_size != 0 {
700            metrics
701                .sstable_avg_value_size
702                .observe(self.avg_value_size as _);
703        }
704
705        if self.epoch_count != 0 {
706            metrics
707                .sstable_distinct_epoch_count
708                .observe(self.epoch_count as _);
709        }
710
711        if !self.block_size_vec.is_empty() {
712            for block_size in &self.block_size_vec {
713                metrics.sstable_block_size.observe(*block_size as _);
714            }
715        }
716    }
717}
718
719#[cfg(test)]
720pub(super) mod tests {
721    use std::collections::{Bound, HashMap};
722
723    use risingwave_common::catalog::TableId;
724    use risingwave_common::hash::VirtualNode;
725    use risingwave_common::util::epoch::test_epoch;
726    use risingwave_hummock_sdk::key::UserKey;
727
728    use super::*;
729    use crate::assert_bytes_eq;
730    use crate::compaction_catalog_manager::{
731        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
732    };
733    use crate::hummock::iterator::test_utils::mock_sstable_store;
734    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
735    use crate::hummock::test_utils::{
736        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
737        test_key_of, test_value_of,
738    };
739    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
740    use crate::monitor::StoreLocalStatistic;
741
742    #[tokio::test]
743    async fn test_empty() {
744        let opt = SstableBuilderOptions {
745            capacity: 0,
746            block_capacity: 4096,
747            restart_interval: 16,
748            bloom_false_positive: 0.001,
749            ..Default::default()
750        };
751
752        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
753        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
754        let b = SstableBuilder::for_test(
755            0,
756            mock_sst_writer(&opt),
757            opt,
758            table_id_to_vnode,
759            table_id_to_watermark_serde,
760        );
761
762        b.finish().await.unwrap();
763    }
764
765    #[tokio::test]
766    async fn test_basic() {
767        let opt = default_builder_opt_for_test();
768
769        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
770        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
771        let mut b = SstableBuilder::for_test(
772            0,
773            mock_sst_writer(&opt),
774            opt,
775            table_id_to_vnode,
776            table_id_to_watermark_serde,
777        );
778
779        for i in 0..TEST_KEYS_COUNT {
780            b.add_for_test(
781                test_key_of(i).to_ref(),
782                HummockValue::put(&test_value_of(i)),
783            )
784            .await
785            .unwrap();
786        }
787
788        let output = b.finish().await.unwrap();
789        let info = output.sst_info.sst_info;
790
791        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
792        assert_bytes_eq!(
793            test_key_of(TEST_KEYS_COUNT - 1).encode(),
794            info.key_range.right
795        );
796        let (data, meta) = output.writer_output;
797        assert_eq!(info.file_size, meta.estimated_size as u64);
798        let offset = info.meta_offset as usize;
799        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
800        assert_eq!(meta2, meta);
801    }
802
803    async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
804        let key_count = 1000;
805
806        let opts = SstableBuilderOptions {
807            capacity: 0,
808            block_capacity: 4096,
809            restart_interval: 16,
810            bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
811            ..Default::default()
812        };
813
814        // build remote table
815        let sstable_store = mock_sstable_store().await;
816        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
817        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
818        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
819            opts,
820            0,
821            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
822            sstable_store.clone(),
823            CachePolicy::NotFill,
824            table_id_to_vnode,
825            table_id_to_watermark_serde,
826        )
827        .await;
828        let table = sstable_store
829            .sstable(&sst_info, &mut StoreLocalStatistic::default())
830            .await
831            .unwrap();
832
833        assert_eq!(table.has_bloom_filter(), with_blooms);
834        for i in 0..key_count {
835            let full_key = test_key_of(i);
836            if table.has_bloom_filter() {
837                let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
838                let key_ref = full_key.user_key.as_ref();
839                assert!(
840                    table.may_match_hash(
841                        &(Bound::Included(key_ref), Bound::Included(key_ref)),
842                        hash
843                    ),
844                    "failed at {}",
845                    i
846                );
847            }
848        }
849    }
850
851    #[tokio::test]
852    async fn test_bloom_filter() {
853        test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
854        test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
855        test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
856        test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
857    }
858
859    #[tokio::test]
860    async fn test_no_bloom_filter_block() {
861        let opts = SstableBuilderOptions::default();
862        // build remote table
863        let sstable_store = mock_sstable_store().await;
864        let writer_opts = SstableWriterOptions::default();
865        let object_id = 1;
866        let writer = sstable_store
867            .clone()
868            .create_sst_writer(object_id, writer_opts);
869        let mut filter = MultiFilterKeyExtractor::default();
870        filter.register(1, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
871        filter.register(
872            2,
873            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
874        );
875        filter.register(3, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
876
877        let table_id_to_vnode = HashMap::from_iter(vec![
878            (1, VirtualNode::COUNT_FOR_TEST),
879            (2, VirtualNode::COUNT_FOR_TEST),
880            (3, VirtualNode::COUNT_FOR_TEST),
881        ]);
882        let table_id_to_watermark_serde = HashMap::from_iter(vec![(1, None), (2, None), (3, None)]);
883
884        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
885            FilterKeyExtractorImpl::Multi(filter),
886            table_id_to_vnode,
887            table_id_to_watermark_serde,
888        ));
889
890        let mut builder = SstableBuilder::new(
891            object_id,
892            writer,
893            BlockedXor16FilterBuilder::new(1024),
894            opts,
895            compaction_catalog_agent_ref,
896            None,
897        );
898
899        let key_count: usize = 10000;
900        for table_id in 1..4 {
901            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
902            for idx in 0..key_count {
903                table_key.resize(VirtualNode::SIZE, 0);
904                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
905                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
906                let v = test_value_of(idx);
907                builder
908                    .add(
909                        FullKey::from_user_key(k, test_epoch(1)),
910                        HummockValue::put(v.as_ref()),
911                    )
912                    .await
913                    .unwrap();
914            }
915        }
916        let ret = builder.finish().await.unwrap();
917        let sst_info = ret.sst_info.sst_info.clone();
918        ret.writer_output.await.unwrap().unwrap();
919        let table = sstable_store
920            .sstable(&sst_info, &mut StoreLocalStatistic::default())
921            .await
922            .unwrap();
923        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
924        for idx in 0..key_count {
925            table_key.resize(VirtualNode::SIZE, 0);
926            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
927            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
928            let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
929            let key_ref = k.as_ref();
930            assert!(
931                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
932            );
933        }
934    }
935}